Merge branch 'master' into system_text_log

This commit is contained in:
Nikita Mikhaylov 2019-07-23 15:51:04 +03:00 committed by GitHub
commit 14b384f3e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
235 changed files with 3635 additions and 1246 deletions

View File

@ -15,6 +15,55 @@
### Исправления ошибок
* Исправлена просадка производительности в методе JOIN в некоторых видах запросов. [#5192](https://github.com/yandex/ClickHouse/pull/5192) ([Winter Zhang](https://github.com/zhang2014))
## ClickHouse release 19.9.2.4, 2019-06-24
### Новые возможности
* Выводить информацию о "замороженных" кусках в таблице `system.parts`. [#5471](https://github.com/yandex/ClickHouse/pull/5471) ([proller](https://github.com/proller))
* clickhouse-client спрашивает клиентский пароль на старте, если не указан в аргументах [#5092](https://github.com/yandex/ClickHouse/pull/5092) ([proller](https://github.com/proller))
* Реализованы функции `dictGet` и `dictGetOrDefault` для Decimal-ов. [#5394](https://github.com/yandex/ClickHouse/pull/5394) ([Artem Zuikov](https://github.com/4ertus2))
### Улучшения
* Debian init: добавлен таймаут остановки [#5522](https://github.com/yandex/ClickHouse/pull/5522) ([proller](https://github.com/proller))
* Добавлены setting-и для обхода запрета на создание LowCardinality-колонок от неподходящих типов данных [#5448](https://github.com/yandex/ClickHouse/pull/5448) ([Olga Khvostikova](https://github.com/stavrolia))
* Функции регрессии возвращают веса модели (когда используются не в качестве состояния для функции `evalMLMethod`). [#5411](https://github.com/yandex/ClickHouse/pull/5411) ([Quid37](https://github.com/Quid37))
* Переименованы и улучшены функции регрессии. [#5492](https://github.com/yandex/ClickHouse/pull/5492) ([Quid37](https://github.com/Quid37))
* Улучшения в интерфейсе поиска подстрок. [#5586](https://github.com/yandex/ClickHouse/pull/5586) ([Danila Kutenin](https://github.com/danlark1))
### Исправления ошибок
* Исправлена потенциальная потеря данных в Kafka [#5445](https://github.com/yandex/ClickHouse/pull/5445) ([Ivan](https://github.com/abyss7))
* Исправлен потенциальный бесконечный цикл в формате `PrettySpace` вызванном с нулем колонок [#5560](https://github.com/yandex/ClickHouse/pull/5560) ([Olga Khvostikova](https://github.com/stavrolia))
* Исправлено переполнение UInt32 в линейной регрессии. Поддержаны неконстантные аргументы в ML моделях. [#5516](https://github.com/yandex/ClickHouse/pull/5516) ([Nikolai Kochetov](https://github.com/KochetovNicolai))
* Исправлено ошибочкное исключение по `ALTER TABLE ... DROP INDEX IF EXISTS ...` когда индекс не существует [#5524](https://github.com/yandex/ClickHouse/pull/5524) ([Gleb Novikov](https://github.com/NanoBjorn))
* Исправлено падение в функции `bitmapHasAny` в скалярных подзапросах [#5528](https://github.com/yandex/ClickHouse/pull/5528) ([Zhichang Yu](https://github.com/yuzhichang))
* Исправлена ошибка обновления DNS в пуле репликации, когда данные не обновлялись после очистки DNS-кеша. [#5534](https://github.com/yandex/ClickHouse/pull/5534) ([alesapin](https://github.com/alesapin))
* Исправлен `ALTER ... MODIFY TTL` в ReplicatedMergeTree. [#5539](https://github.com/yandex/ClickHouse/pull/5539) ([Anton Popov](https://github.com/CurtizJ))
* Исправлен INSERT в Distributed таблицу с MATERIALIZED колонками [#5429](https://github.com/yandex/ClickHouse/pull/5429) ([Azat Khuzhin](https://github.com/azat))
* Исправлена ошибка аллокации памяти при очистке табилц с движком Join [#5437](https://github.com/yandex/ClickHouse/pull/5437) ([TCeason](https://github.com/TCeason))
* Исправление таймзон. [#5443](https://github.com/yandex/ClickHouse/pull/5443) ([Ivan](https://github.com/abyss7))
* Исправлена ошибка в алгоритме MultiVolnitsky приводящая в редких случаях к неверным результатам функции `multiSearchAny`. [#5588](https://github.com/yandex/ClickHouse/pull/5588) ([Danila Kutenin](https://github.com/danlark1))
* Исправлена ошибка, при которой часть настроек не прокидывалась для HTTP запросов. [#5455](https://github.com/yandex/ClickHouse/pull/5455) ([Danila Kutenin](https://github.com/danlark1))
* Исправлена ошибка когда куски данных удалялись из файловой системы и оставались в Zookeeper-е. [#5520](https://github.com/yandex/ClickHouse/pull/5520) ([alesapin](https://github.com/alesapin))
* Убрана отладочная информация из MySQL-протокола [#5478](https://github.com/yandex/ClickHouse/pull/5478) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Пропуск ZNONODE в процессе выполнения DDL запросов [#5489](https://github.com/yandex/ClickHouse/pull/5489) ([Azat Khuzhin](https://github.com/azat))
* Исправлена ошибка `UNION ALL` с разными типами возвращаемых результатов из подзапросов. [#5503](https://github.com/yandex/ClickHouse/pull/5503) ([Artem Zuikov](https://github.com/4ertus2))
* Кидаем исплючение вместо падения в случае неправильного целочисленного типа в фукнциях `dictGetT`. [#5446](https://github.com/yandex/ClickHouse/pull/5446) ([Artem Zuikov](https://github.com/4ertus2))
* Исправлены неверные значения element_count и load_factor в таблице `system.dictionaries` для хэш-словаря. [#5440](https://github.com/yandex/ClickHouse/pull/5440) ([Azat Khuzhin](https://github.com/azat))
### Улучшения сборки, тестирования и пакетирования
* Исправлена сборка с выключенной компрессией `Brotli` (cmake переменная `ENABLE_BROTLI=OFF`). [#5521](https://github.com/yandex/ClickHouse/pull/5521) ([Anton Yuzhaninov](https://github.com/citrin))
* Исправлено включение roaring.h [#5523](https://github.com/yandex/ClickHouse/pull/5523) ([Orivej Desh](https://github.com/orivej))
* Исправлены предупреждения gcc9 в hyperscan. [#5546](https://github.com/yandex/ClickHouse/pull/5546) ([Danila Kutenin](https://github.com/danlark1))
* Исправлены предупреждения в сборке с gcc-9. [#5498](https://github.com/yandex/ClickHouse/pull/5498) ([Danila Kutenin](https://github.com/danlark1))
* Исправлена линковка с lld [#5477](https://github.com/yandex/ClickHouse/pull/5477) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Убраны ненужные специализации шаблонов в коде словарей [#5452](https://github.com/yandex/ClickHouse/pull/5452) ([Artem Zuikov](https://github.com/4ertus2))
* Улучшения в тестах производительности. [#5497](https://github.com/yandex/ClickHouse/pull/5497) ([Olga Khvostikova](https://github.com/stavrolia))
* Исправления для параллельного запуска тестов. [#5506](https://github.com/yandex/ClickHouse/pull/5506) ([proller](https://github.com/proller))
* Docker использует конфиг из clickhouse-test [#5531](https://github.com/yandex/ClickHouse/pull/5531) ([proller](https://github.com/proller))
* Исправлена сборка под FreeBSD [#5447](https://github.com/yandex/ClickHouse/pull/5447) ([proller](https://github.com/proller))
* Обновление boost до 1.70 [#5570](https://github.com/yandex/ClickHouse/pull/5570) ([proller](https://github.com/proller))
* Исправлена сборка clickhouse как сабмодуля [#5574](https://github.com/yandex/ClickHouse/pull/5574) ([proller](https://github.com/proller))
* Улучшение теста производительности функций JSONExtract [#5444](https://github.com/yandex/ClickHouse/pull/5444) ([Vitaly Baranov](https://github.com/vitlibar))
## ClickHouse release 19.8.3.8, 2019-06-11
### Новые возможности

View File

@ -182,6 +182,11 @@ else ()
set (CXX_FLAGS_INTERNAL_COMPILER "-std=c++1z")
endif ()
if (COMPILER_GCC OR COMPILER_CLANG)
# Enable C++14 sized global deallocation functions. It should be enabled by setting -std=c++14 but I'm not sure.
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsized-deallocation")
endif ()
option(WITH_COVERAGE "Build with coverage." 0)
if(WITH_COVERAGE AND COMPILER_CLANG)
set(COMPILER_FLAGS "${COMPILER_FLAGS} -fprofile-instr-generate -fcoverage-mapping")
@ -220,7 +225,6 @@ endif ()
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package (Threads)
include (cmake/find_cxxabi.cmake)
include (cmake/find_cxx.cmake)
include (cmake/test_compiler.cmake)
@ -404,6 +408,11 @@ if (UNBUNDLED OR NOT (OS_LINUX OR APPLE) OR ARCH_32)
option (NO_WERROR "Disable -Werror compiler option" ON)
endif ()
if (USE_LIBCXX)
set (HAVE_LIBCXX 1)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
endif()
if (USE_LIBCXX AND USE_INTERNAL_LIBCXX_LIBRARY)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -nostdinc++ -isystem ${LIBCXX_INCLUDE_DIR} -isystem ${LIBCXXABI_INCLUDE_DIR}")
endif ()

View File

@ -7,7 +7,7 @@ ClickHouse is an open-source column-oriented database management system that all
* [Official website](https://clickhouse.yandex/) has quick high-level overview of ClickHouse on main page.
* [Tutorial](https://clickhouse.yandex/tutorial.html) shows how to set up and query small ClickHouse cluster.
* [Documentation](https://clickhouse.yandex/docs/en/) provides more in-depth information.
* [YouTube channel](https://www.youtube.com/channel/UChtmrD-dsdpspr42P_PyRAw) has a lot of content about ClickHouse in video format.
* [YouTube channel](https://www.youtube.com/c/ClickHouseDB) has a lot of content about ClickHouse in video format.
* [Blog](https://clickhouse.yandex/blog/en/) contains various ClickHouse-related articles, as well as announces and reports about events.
* [Contacts](https://clickhouse.yandex/#contacts) can help to get your questions answered if there are any.
* You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person.

View File

@ -1,23 +1,26 @@
if (NOT APPLE)
option (USE_INTERNAL_LIBCXX_LIBRARY "Set to FALSE to use system libcxx library instead of bundled" ${NOT_UNBUNDLED})
option (USE_INTERNAL_LIBCXX_LIBRARY "Set to FALSE to use system libcxx and libcxxabi libraries instead of bundled" ${NOT_UNBUNDLED})
endif ()
if (USE_INTERNAL_LIBCXX_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libcxx/include/vector")
message (WARNING "submodule contrib/libcxx is missing. to fix try run: \n git submodule update --init --recursive")
set (USE_INTERNAL_LIBCXX_LIBRARY 0)
message (WARNING "submodule contrib/libcxx is missing. to fix try run: \n git submodule update --init --recursive")
set (USE_INTERNAL_LIBCXX_LIBRARY 0)
endif ()
if (USE_INTERNAL_LIBCXX_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libcxxabi/src")
message (WARNING "submodule contrib/libcxxabi is missing. to fix try run: \n git submodule update --init --recursive")
set (USE_INTERNAL_LIBCXXABI_LIBRARY 0)
endif ()
if (NOT USE_INTERNAL_LIBCXX_LIBRARY)
find_library (LIBCXX_LIBRARY c++)
find_path (LIBCXX_INCLUDE_DIR NAMES vector PATHS ${LIBCXX_INCLUDE_PATHS})
endif ()
if (LIBCXX_LIBRARY AND LIBCXX_INCLUDE_DIR)
find_library (LIBCXXABI_LIBRARY c++abi)
else ()
set (LIBCXX_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcxx/include)
set (USE_INTERNAL_LIBCXX_LIBRARY 1)
set (LIBCXXABI_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcxxabi/include)
set (LIBCXX_LIBRARY cxx_static)
set (HAVE_LIBCXX 1)
set (LIBCXXABI_LIBRARY cxxabi_static)
endif ()
message (STATUS "Using libcxx: ${LIBCXX_INCLUDE_DIR} : ${LIBCXX_LIBRARY}")
message (STATUS "Using libcxx: ${LIBCXX_LIBRARY}")
message (STATUS "Using libcxxabi: ${LIBCXXABI_LIBRARY}")

View File

@ -1,22 +0,0 @@
if (NOT APPLE)
option (USE_INTERNAL_LIBCXXABI_LIBRARY "Set to FALSE to use system libcxxabi library instead of bundled" ${NOT_UNBUNDLED})
endif ()
if (USE_INTERNAL_LIBCXXABI_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libcxxabi/src")
message (WARNING "submodule contrib/libcxxabi is missing. to fix try run: \n git submodule update --init --recursive")
set (USE_INTERNAL_LIBCXXABI_LIBRARY 0)
endif ()
if (NOT USE_INTERNAL_LIBCXXABI_LIBRARY)
find_library (LIBCXXABI_LIBRARY cxxabi)
find_path (LIBCXXABI_INCLUDE_DIR NAMES vector PATHS ${LIBCXXABI_INCLUDE_PATHS})
endif ()
if (LIBCXXABI_LIBRARY AND LIBCXXABI_INCLUDE_DIR)
else ()
set (LIBCXXABI_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcxxabi/include)
set (USE_INTERNAL_LIBCXXABI_LIBRARY 1)
set (LIBCXXABI_LIBRARY cxxabi_static)
endif ()
message (STATUS "Using libcxxabi: ${LIBCXXABI_INCLUDE_DIR} : ${LIBCXXABI_LIBRARY}")

View File

@ -13,7 +13,10 @@ if (NOT DEFINED ENABLE_POCO_NETSSL OR ENABLE_POCO_NETSSL)
list (APPEND POCO_COMPONENTS Crypto NetSSL)
endif ()
if (NOT DEFINED ENABLE_POCO_MONGODB OR ENABLE_POCO_MONGODB)
set(ENABLE_POCO_MONGODB 1 CACHE BOOL "")
list (APPEND POCO_COMPONENTS MongoDB)
else ()
set(ENABLE_POCO_MONGODB 0 CACHE BOOL "")
endif ()
# TODO: after new poco release with SQL library rename ENABLE_POCO_ODBC -> ENABLE_POCO_SQLODBC
if (NOT DEFINED ENABLE_POCO_ODBC OR ENABLE_POCO_ODBC)
@ -37,6 +40,7 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
set (ENABLE_DATA_MYSQL 0 CACHE BOOL "")
set (ENABLE_DATA_POSTGRESQL 0 CACHE BOOL "")
set (ENABLE_ENCODINGS 0 CACHE BOOL "")
set (ENABLE_MONGODB ${ENABLE_POCO_MONGODB} CACHE BOOL "" FORCE)
# new after 2.0.0:
set (POCO_ENABLE_ZIP 0 CACHE BOOL "")
@ -60,7 +64,7 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
"${ClickHouse_SOURCE_DIR}/contrib/poco/Util/include/"
)
if (NOT DEFINED ENABLE_POCO_MONGODB OR ENABLE_POCO_MONGODB)
if (ENABLE_POCO_MONGODB)
set (Poco_MongoDB_LIBRARY PocoMongoDB)
set (Poco_MongoDB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/poco/MongoDB/include/")
endif ()

View File

@ -15,12 +15,9 @@ if (USE_INTERNAL_UNWIND_LIBRARY)
add_subdirectory (libunwind-cmake)
endif ()
if (USE_LIBCXX AND USE_INTERNAL_LIBCXXABI_LIBRARY)
add_subdirectory(libcxxabi-cmake)
endif()
if (USE_LIBCXX AND USE_INTERNAL_LIBCXX_LIBRARY)
add_subdirectory(libcxx-cmake)
add_subdirectory(libcxxabi-cmake)
endif()

View File

@ -15,7 +15,6 @@ ${JEMALLOC_SOURCE_DIR}/src/extent_mmap.c
${JEMALLOC_SOURCE_DIR}/src/hash.c
${JEMALLOC_SOURCE_DIR}/src/hook.c
${JEMALLOC_SOURCE_DIR}/src/jemalloc.c
${JEMALLOC_SOURCE_DIR}/src/jemalloc_cpp.cpp
${JEMALLOC_SOURCE_DIR}/src/large.c
${JEMALLOC_SOURCE_DIR}/src/log.c
${JEMALLOC_SOURCE_DIR}/src/malloc_io.c

2
contrib/libunwind vendored

@ -1 +1 @@
Subproject commit ec86b1c6a2c6b8ba316f429db9a6d4122dd12710
Subproject commit 17a48fbfa7913ee889960a698516bd3ba51d63ee

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit ece721f1085e3894cb5286e8560af84cd1445326
Subproject commit ea2516be366a73a02a82b499ed4a7db1d40037e0

View File

@ -231,6 +231,8 @@ target_link_libraries(clickhouse_common_io
Threads::Threads
PRIVATE
${CMAKE_DL_LIBS}
PRIVATE
rt
PUBLIC
roaring
)
@ -385,6 +387,7 @@ endif()
if (USE_JEMALLOC)
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${JEMALLOC_INCLUDE_DIR}) # used in Interpreters/AsynchronousMetrics.cpp
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${JEMALLOC_INCLUDE_DIR}) # new_delete.cpp
endif ()
target_include_directories (dbms PUBLIC ${DBMS_INCLUDE_DIR} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/src/Formats/include)

View File

@ -1,11 +1,11 @@
# This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54424)
set(VERSION_REVISION 54425)
set(VERSION_MAJOR 19)
set(VERSION_MINOR 12)
set(VERSION_MINOR 13)
set(VERSION_PATCH 1)
set(VERSION_GITHASH a584f0ca6cb5df9b0d9baf1e2e1eaa7d12a20a44)
set(VERSION_DESCRIBE v19.12.1.1-prestable)
set(VERSION_STRING 19.12.1.1)
set(VERSION_GITHASH adfc36917222bdb03eba069f0cad0f4f5b8f1c94)
set(VERSION_DESCRIBE v19.13.1.1-prestable)
set(VERSION_STRING 19.13.1.1)
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -305,9 +305,9 @@ void MySQLHandler::authenticate(const HandshakeResponse & handshake_response, co
LOG_TRACE(log, "Received empty password");
}
if (!password.empty())
if (!password.empty() && password.back() == 0)
{
password.pop_back(); /// terminating null byte
password.pop_back();
}
try

View File

@ -508,6 +508,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
LOG_DEBUG(log, "Loaded metadata.");
/// Init trace collector only after trace_log system table was created
global_context->initializeTraceCollector();
global_context->setCurrentDatabase(default_database);
if (has_zookeeper && config().has("distributed_ddl"))

View File

@ -384,7 +384,10 @@ void TCPHandler::processInsertQuery(const Settings & global_settings)
{
const auto & db_and_table = query_context->getInsertionTable();
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
sendTableColumns(query_context->getTable(db_and_table.first, db_and_table.second)->getColumns());
{
if (!db_and_table.second.empty())
sendTableColumns(query_context->getTable(db_and_table.first, db_and_table.second)->getColumns());
}
}
/// Send block to the client - table structure.

View File

@ -294,6 +294,16 @@
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_log>
<!-- Trace log. Stores stack traces collected by query profilers.
See query_profiler_real_time_period_ns and query_profiler_cpu_time_period_ns settings. -->
<trace_log>
<database>system</database>
<table>trace_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</trace_log>
<!-- Query thread log. Has information about all threads participated in query execution.
Used only for queries with setting log_query_threads = 1. -->
<query_thread_log>

View File

@ -45,11 +45,11 @@ namespace
/// Such default parameters were picked because they did good on some tests,
/// though it still requires to fit parameters to achieve better result
auto learning_rate = Float64(0.01);
auto l2_reg_coef = Float64(0.1);
UInt32 batch_size = 15;
auto learning_rate = Float64(1.0);
auto l2_reg_coef = Float64(0.5);
UInt64 batch_size = 15;
std::string weights_updater_name = "SGD";
std::string weights_updater_name = "Adam";
std::unique_ptr<IGradientComputer> gradient_computer;
if (!parameters.empty())
@ -62,12 +62,12 @@ namespace
}
if (parameters.size() > 2)
{
batch_size = applyVisitor(FieldVisitorConvertToNumber<UInt32>(), parameters[2]);
batch_size = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), parameters[2]);
}
if (parameters.size() > 3)
{
weights_updater_name = parameters[3].safeGet<String>();
if (weights_updater_name != "SGD" && weights_updater_name != "Momentum" && weights_updater_name != "Nesterov")
if (weights_updater_name != "SGD" && weights_updater_name != "Momentum" && weights_updater_name != "Nesterov" && weights_updater_name != "Adam")
throw Exception("Invalid parameter for weights updater. The only supported are 'SGD', 'Momentum' and 'Nesterov'",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
@ -106,8 +106,8 @@ void registerAggregateFunctionMLMethod(AggregateFunctionFactory & factory)
LinearModelData::LinearModelData(
Float64 learning_rate,
Float64 l2_reg_coef,
UInt32 param_num,
UInt32 batch_capacity,
UInt64 param_num,
UInt64 batch_capacity,
std::shared_ptr<DB::IGradientComputer> gradient_computer,
std::shared_ptr<DB::IWeightsUpdater> weights_updater)
: learning_rate(learning_rate)
@ -126,7 +126,7 @@ void LinearModelData::update_state()
if (batch_size == 0)
return;
weights_updater->update(batch_size, weights, bias, gradient_batch);
weights_updater->update(batch_size, weights, bias, learning_rate, gradient_batch);
batch_size = 0;
++iter_num;
gradient_batch.assign(gradient_batch.size(), Float64{0.0});
@ -191,6 +191,7 @@ void LinearModelData::merge(const DB::LinearModelData & rhs)
update_state();
/// can't update rhs state because it's constant
/// squared mean is more stable (in sence of quality of prediction) when two states with quietly different number of learning steps are merged
Float64 frac = (static_cast<Float64>(iter_num) * iter_num) / (iter_num * iter_num + rhs.iter_num * rhs.iter_num);
for (size_t i = 0; i < weights.size(); ++i)
@ -210,7 +211,7 @@ void LinearModelData::add(const IColumn ** columns, size_t row_num)
/// Here we have columns + 1 as first column corresponds to target value, and others - to features
weights_updater->add_to_batch(
gradient_batch, *gradient_computer, weights, bias, learning_rate, l2_reg_coef, target, columns + 1, row_num);
gradient_batch, *gradient_computer, weights, bias, l2_reg_coef, target, columns + 1, row_num);
++batch_size;
if (batch_size == batch_capacity)
@ -219,6 +220,90 @@ void LinearModelData::add(const IColumn ** columns, size_t row_num)
}
}
/// Weights updaters
void Adam::write(WriteBuffer & buf) const
{
writeBinary(average_gradient, buf);
writeBinary(average_squared_gradient, buf);
}
void Adam::read(ReadBuffer & buf)
{
readBinary(average_gradient, buf);
readBinary(average_squared_gradient, buf);
}
void Adam::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac)
{
auto & adam_rhs = static_cast<const Adam &>(rhs);
if (average_gradient.empty())
{
if (!average_squared_gradient.empty() ||
adam_rhs.average_gradient.size() != adam_rhs.average_squared_gradient.size())
throw Exception("Average_gradient and average_squared_gradient must have same size", ErrorCodes::LOGICAL_ERROR);
average_gradient.resize(adam_rhs.average_gradient.size(), Float64{0.0});
average_squared_gradient.resize(adam_rhs.average_squared_gradient.size(), Float64{0.0});
}
for (size_t i = 0; i < average_gradient.size(); ++i)
{
average_gradient[i] = average_gradient[i] * frac + adam_rhs.average_gradient[i] * rhs_frac;
average_squared_gradient[i] = average_squared_gradient[i] * frac + adam_rhs.average_squared_gradient[i] * rhs_frac;
}
beta1_powered_ *= adam_rhs.beta1_powered_;
beta2_powered_ *= adam_rhs.beta2_powered_;
}
void Adam::update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient)
{
if (average_gradient.empty())
{
if (!average_squared_gradient.empty())
throw Exception("Average_gradient and average_squared_gradient must have same size", ErrorCodes::LOGICAL_ERROR);
average_gradient.resize(batch_gradient.size(), Float64{0.0});
average_squared_gradient.resize(batch_gradient.size(), Float64{0.0});
}
for (size_t i = 0; i != average_gradient.size(); ++i)
{
Float64 normed_gradient = batch_gradient[i] / batch_size;
average_gradient[i] = beta1_ * average_gradient[i] + (1 - beta1_) * normed_gradient;
average_squared_gradient[i] = beta2_ * average_squared_gradient[i] +
(1 - beta2_) * normed_gradient * normed_gradient;
}
for (size_t i = 0; i < weights.size(); ++i)
{
weights[i] += (learning_rate * average_gradient[i]) /
((1 - beta1_powered_) * (sqrt(average_squared_gradient[i] / (1 - beta2_powered_)) + eps_));
}
bias += (learning_rate * average_gradient[weights.size()]) /
((1 - beta1_powered_) * (sqrt(average_squared_gradient[weights.size()] / (1 - beta2_powered_)) + eps_));
beta1_powered_ *= beta1_;
beta2_powered_ *= beta2_;
}
void Adam::add_to_batch(
std::vector<Float64> & batch_gradient,
IGradientComputer & gradient_computer,
const std::vector<Float64> & weights,
Float64 bias,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
size_t row_num)
{
if (average_gradient.empty())
{
average_gradient.resize(batch_gradient.size(), Float64{0.0});
average_squared_gradient.resize(batch_gradient.size(), Float64{0.0});
}
gradient_computer.compute(batch_gradient, weights, bias, l2_reg_coef, target, columns, row_num);
}
void Nesterov::read(ReadBuffer & buf)
{
@ -233,13 +318,16 @@ void Nesterov::write(WriteBuffer & buf) const
void Nesterov::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac)
{
auto & nesterov_rhs = static_cast<const Nesterov &>(rhs);
if (accumulated_gradient.empty())
accumulated_gradient.resize(nesterov_rhs.accumulated_gradient.size(), Float64{0.0});
for (size_t i = 0; i < accumulated_gradient.size(); ++i)
{
accumulated_gradient[i] = accumulated_gradient[i] * frac + nesterov_rhs.accumulated_gradient[i] * rhs_frac;
}
}
void Nesterov::update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient)
void Nesterov::update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient)
{
if (accumulated_gradient.empty())
{
@ -248,7 +336,7 @@ void Nesterov::update(UInt32 batch_size, std::vector<Float64> & weights, Float64
for (size_t i = 0; i < batch_gradient.size(); ++i)
{
accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + batch_gradient[i] / batch_size;
accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + (learning_rate * batch_gradient[i]) / batch_size;
}
for (size_t i = 0; i < weights.size(); ++i)
{
@ -262,7 +350,6 @@ void Nesterov::add_to_batch(
IGradientComputer & gradient_computer,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
@ -280,7 +367,7 @@ void Nesterov::add_to_batch(
}
auto shifted_bias = bias + accumulated_gradient[weights.size()] * alpha_;
gradient_computer.compute(batch_gradient, shifted_weights, shifted_bias, learning_rate, l2_reg_coef, target, columns, row_num);
gradient_computer.compute(batch_gradient, shifted_weights, shifted_bias, l2_reg_coef, target, columns, row_num);
}
void Momentum::read(ReadBuffer & buf)
@ -302,7 +389,7 @@ void Momentum::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac
}
}
void Momentum::update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient)
void Momentum::update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient)
{
/// batch_size is already checked to be greater than 0
if (accumulated_gradient.empty())
@ -312,7 +399,7 @@ void Momentum::update(UInt32 batch_size, std::vector<Float64> & weights, Float64
for (size_t i = 0; i < batch_gradient.size(); ++i)
{
accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + batch_gradient[i] / batch_size;
accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + (learning_rate * batch_gradient[i]) / batch_size;
}
for (size_t i = 0; i < weights.size(); ++i)
{
@ -322,14 +409,14 @@ void Momentum::update(UInt32 batch_size, std::vector<Float64> & weights, Float64
}
void StochasticGradientDescent::update(
UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient)
UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient)
{
/// batch_size is already checked to be greater than 0
for (size_t i = 0; i < weights.size(); ++i)
{
weights[i] += batch_gradient[i] / batch_size;
weights[i] += (learning_rate * batch_gradient[i]) / batch_size;
}
bias += batch_gradient[weights.size()] / batch_size;
bias += (learning_rate * batch_gradient[weights.size()]) / batch_size;
}
void IWeightsUpdater::add_to_batch(
@ -337,15 +424,16 @@ void IWeightsUpdater::add_to_batch(
IGradientComputer & gradient_computer,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
size_t row_num)
{
gradient_computer.compute(batch_gradient, weights, bias, learning_rate, l2_reg_coef, target, columns, row_num);
gradient_computer.compute(batch_gradient, weights, bias, l2_reg_coef, target, columns, row_num);
}
/// Gradient computers
void LogisticRegression::predict(
ColumnVector<Float64>::Container & container,
Block & block,
@ -387,7 +475,6 @@ void LogisticRegression::compute(
std::vector<Float64> & batch_gradient,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
@ -402,11 +489,11 @@ void LogisticRegression::compute(
derivative *= target;
derivative = exp(derivative);
batch_gradient[weights.size()] += learning_rate * target / (derivative + 1);
batch_gradient[weights.size()] += target / (derivative + 1);
for (size_t i = 0; i < weights.size(); ++i)
{
auto value = (*columns[i]).getFloat64(row_num);
batch_gradient[i] += learning_rate * target * value / (derivative + 1) - 2 * learning_rate * l2_reg_coef * weights[i];
batch_gradient[i] += target * value / (derivative + 1) - 2 * l2_reg_coef * weights[i];
}
}
@ -459,7 +546,6 @@ void LinearRegression::compute(
std::vector<Float64> & batch_gradient,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
@ -471,13 +557,13 @@ void LinearRegression::compute(
auto value = (*columns[i]).getFloat64(row_num);
derivative -= weights[i] * value;
}
derivative *= (2 * learning_rate);
derivative *= 2;
batch_gradient[weights.size()] += derivative;
for (size_t i = 0; i < weights.size(); ++i)
{
auto value = (*columns[i]).getFloat64(row_num);
batch_gradient[i] += derivative * value - 2 * learning_rate * l2_reg_coef * weights[i];
batch_gradient[i] += derivative * value - 2 * l2_reg_coef * weights[i];
}
}

View File

@ -33,7 +33,6 @@ public:
std::vector<Float64> & batch_gradient,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
@ -60,7 +59,6 @@ public:
std::vector<Float64> & batch_gradient,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
@ -87,7 +85,6 @@ public:
std::vector<Float64> & batch_gradient,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
@ -120,14 +117,18 @@ public:
IGradientComputer & gradient_computer,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
size_t row_num);
/// Updates current weights according to the gradient from the last mini-batch
virtual void update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & gradient) = 0;
virtual void update(
UInt64 batch_size,
std::vector<Float64> & weights,
Float64 & bias,
Float64 learning_rate,
const std::vector<Float64> & gradient) = 0;
/// Used during the merge of two states
virtual void merge(const IWeightsUpdater &, Float64, Float64) {}
@ -143,7 +144,7 @@ public:
class StochasticGradientDescent : public IWeightsUpdater
{
public:
void update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient) override;
void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override;
};
@ -154,7 +155,7 @@ public:
Momentum(Float64 alpha) : alpha_(alpha) {}
void update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient) override;
void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override;
virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override;
@ -180,13 +181,12 @@ public:
IGradientComputer & gradient_computer,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
size_t row_num) override;
void update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient) override;
void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override;
virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override;
@ -195,11 +195,51 @@ public:
void read(ReadBuffer & buf) override;
private:
Float64 alpha_{0.1};
const Float64 alpha_ = 0.9;
std::vector<Float64> accumulated_gradient;
};
class Adam : public IWeightsUpdater
{
public:
Adam()
{
beta1_powered_ = beta1_;
beta2_powered_ = beta2_;
}
void add_to_batch(
std::vector<Float64> & batch_gradient,
IGradientComputer & gradient_computer,
const std::vector<Float64> & weights,
Float64 bias,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
size_t row_num) override;
void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override;
virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override;
void write(WriteBuffer & buf) const override;
void read(ReadBuffer & buf) override;
private:
/// beta1 and beta2 hyperparameters have such recommended values
const Float64 beta1_ = 0.9;
const Float64 beta2_ = 0.999;
const Float64 eps_ = 0.000001;
Float64 beta1_powered_;
Float64 beta2_powered_;
std::vector<Float64> average_gradient;
std::vector<Float64> average_squared_gradient;
};
/** LinearModelData is a class which manages current state of learning
*/
class LinearModelData
@ -210,8 +250,8 @@ public:
LinearModelData(
Float64 learning_rate,
Float64 l2_reg_coef,
UInt32 param_num,
UInt32 batch_capacity,
UInt64 param_num,
UInt64 batch_capacity,
std::shared_ptr<IGradientComputer> gradient_computer,
std::shared_ptr<IWeightsUpdater> weights_updater);
@ -269,7 +309,7 @@ public:
std::string weights_updater_name,
Float64 learning_rate,
Float64 l2_reg_coef,
UInt32 batch_size,
UInt64 batch_size,
const DataTypes & arguments_types,
const Array & params)
: IAggregateFunctionDataHelper<Data, AggregateFunctionMLMethod<Data, Name>>(arguments_types, params)
@ -303,6 +343,8 @@ public:
new_weights_updater = std::make_shared<Momentum>();
else if (weights_updater_name == "Nesterov")
new_weights_updater = std::make_shared<Nesterov>();
else if (weights_updater_name == "Adam")
new_weights_updater = std::make_shared<Adam>();
else
throw Exception("Illegal name of weights updater (should have been checked earlier)", ErrorCodes::LOGICAL_ERROR);
@ -355,10 +397,10 @@ public:
const char * getHeaderFilePath() const override { return __FILE__; }
private:
UInt32 param_num;
UInt64 param_num;
Float64 learning_rate;
Float64 l2_reg_coef;
UInt32 batch_size;
UInt64 batch_size;
std::shared_ptr<IGradientComputer> gradient_computer;
std::string weights_updater_name;
};
@ -371,4 +413,5 @@ struct NameLogisticRegression
{
static constexpr auto name = "stochasticLogisticRegression";
};
}

View File

@ -1,7 +1,6 @@
#pragma once
#include <Common/HashTable/Hash.h>
#include <Common/MemoryTracker.h>
#include <Common/PODArray.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
@ -513,8 +512,6 @@ private:
void mediumToLarge()
{
CurrentMemoryTracker::alloc(sizeof(detail::QuantileTimingLarge));
/// While the data is copied from medium, it is not possible to set `large` value (otherwise it will overwrite some data).
detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge;
@ -528,8 +525,6 @@ private:
void tinyToLarge()
{
CurrentMemoryTracker::alloc(sizeof(detail::QuantileTimingLarge));
/// While the data is copied from `medium` it is not possible to set `large` value (otherwise it will overwrite some data).
detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge;
@ -562,8 +557,6 @@ public:
else if (kind == Kind::Large)
{
delete large;
CurrentMemoryTracker::free(sizeof(detail::QuantileTimingLarge));
}
}

View File

@ -126,20 +126,32 @@ private:
{
for (size_t i = 0; i < buf_size(); ++i)
{
if (buf[i] && !good(buf[i]))
if (buf[i])
{
buf[i] = 0;
--m_size;
if (!good(buf[i]))
{
buf[i] = 0;
--m_size;
}
/** After removing the elements, there may have been room for items,
* which were placed further than necessary, due to a collision.
* You need to move them.
*/
else if (i != place(buf[i]))
{
HashValue x = buf[i];
buf[i] = 0;
reinsertImpl(x);
}
}
}
/** After removing the elements, there may have been room for items,
* which were placed further than necessary, due to a collision.
* You need to move them.
/** We must process first collision resolution chain once again.
* Look at the comment in "resize" function.
*/
for (size_t i = 0; i < buf_size(); ++i)
for (size_t i = 0; i < buf_size() && buf[i]; ++i)
{
if (unlikely(buf[i] && i != place(buf[i])))
if (i != place(buf[i]))
{
HashValue x = buf[i];
buf[i] = 0;

View File

@ -30,9 +30,9 @@ ColumnAggregateFunction::~ColumnAggregateFunction()
func->destroy(val);
}
void ColumnAggregateFunction::addArena(ArenaPtr arena_)
void ColumnAggregateFunction::addArena(ConstArenaPtr arena_)
{
arenas.push_back(arena_);
foreign_arenas.push_back(arena_);
}
MutableColumnPtr ColumnAggregateFunction::convertToValues() const
@ -265,27 +265,21 @@ void ColumnAggregateFunction::updateHashWithValue(size_t n, SipHash & hash) cons
hash.update(wbuf.str().c_str(), wbuf.str().size());
}
/// NOTE: Highly overestimates size of a column if it was produced in AggregatingBlockInputStream (it contains size of other columns)
/// The returned size is less than real size. The reason is that some parts of
/// aggregate function data may be allocated on shared arenas. These arenas are
/// used for several blocks, and also may be updated concurrently from other
/// threads, so we can't know the size of these data.
size_t ColumnAggregateFunction::byteSize() const
{
size_t res = data.size() * sizeof(data[0]);
for (const auto & arena : arenas)
res += arena->size();
return res;
return data.size() * sizeof(data[0])
+ (my_arena ? my_arena->size() : 0);
}
/// Like byteSize(), highly overestimates size
/// Like in byteSize(), the size is underestimated.
size_t ColumnAggregateFunction::allocatedBytes() const
{
size_t res = data.allocated_bytes();
for (const auto & arena : arenas)
res += arena->size();
return res;
return data.allocated_bytes()
+ (my_arena ? my_arena->size() : 0);
}
void ColumnAggregateFunction::protect()
@ -295,7 +289,7 @@ void ColumnAggregateFunction::protect()
MutableColumnPtr ColumnAggregateFunction::cloneEmpty() const
{
return create(func, Arenas(1, std::make_shared<Arena>()));
return create(func);
}
String ColumnAggregateFunction::getTypeString() const
@ -364,9 +358,10 @@ void ColumnAggregateFunction::insertMergeFrom(const IColumn & from, size_t n)
Arena & ColumnAggregateFunction::createOrGetArena()
{
if (unlikely(arenas.empty()))
arenas.emplace_back(std::make_shared<Arena>());
return *arenas.back().get();
if (unlikely(!my_arena))
my_arena = std::make_shared<Arena>();
return *my_arena.get();
}
@ -542,4 +537,31 @@ void ColumnAggregateFunction::getExtremes(Field & min, Field & max) const
max = serialized;
}
namespace
{
ConstArenas concatArenas(const ConstArenas & array, ConstArenaPtr arena)
{
ConstArenas result = array;
if (arena)
result.push_back(std::move(arena));
return result;
}
}
ColumnAggregateFunction::MutablePtr ColumnAggregateFunction::createView() const
{
auto res = create(func, concatArenas(foreign_arenas, my_arena));
res->src = getPtr();
return res;
}
ColumnAggregateFunction::ColumnAggregateFunction(const ColumnAggregateFunction & src_)
: foreign_arenas(concatArenas(src_.foreign_arenas, src_.my_arena)),
func(src_.func), src(src_.getPtr()), data(src_.data.begin(), src_.data.end())
{
}
}

View File

@ -17,7 +17,8 @@ namespace DB
class Arena;
using ArenaPtr = std::shared_ptr<Arena>;
using Arenas = std::vector<ArenaPtr>;
using ConstArenaPtr = std::shared_ptr<const Arena>;
using ConstArenas = std::vector<ConstArenaPtr>;
/** Column of states of aggregate functions.
@ -52,8 +53,15 @@ public:
private:
friend class COWHelper<IColumn, ColumnAggregateFunction>;
/// Memory pools. Aggregate states are allocated from them.
Arenas arenas;
/// Arenas used by function states that are created elsewhere. We own these
/// arenas in the sense of extending their lifetime, but do not modify them.
/// Even reading these arenas is unsafe, because they may be shared with
/// other data blocks and modified by other threads concurrently.
ConstArenas foreign_arenas;
/// Arena for allocating the internals of function states created by current
/// column (e.g., when inserting new states).
ArenaPtr my_arena;
/// Used for destroying states and for finalization of values.
AggregateFunctionPtr func;
@ -68,12 +76,7 @@ private:
ColumnAggregateFunction() {}
/// Create a new column that has another column as a source.
MutablePtr createView() const
{
MutablePtr res = create(func, arenas);
res->src = getPtr();
return res;
}
MutablePtr createView() const;
/// If we have another column as a source (owner of data), copy all data to ourself and reset source.
/// This is needed before inserting new elements, because we must own these elements (to destroy them in destructor),
@ -85,15 +88,14 @@ private:
{
}
ColumnAggregateFunction(const AggregateFunctionPtr & func_, const Arenas & arenas_)
: arenas(arenas_), func(func_)
ColumnAggregateFunction(const AggregateFunctionPtr & func_,
const ConstArenas & arenas_)
: foreign_arenas(arenas_), func(func_)
{
}
ColumnAggregateFunction(const ColumnAggregateFunction & src_)
: arenas(src_.arenas), func(src_.func), src(src_.getPtr()), data(src_.data.begin(), src_.data.end())
{
}
ColumnAggregateFunction(const ColumnAggregateFunction & src_);
String getTypeString() const;
@ -109,7 +111,7 @@ public:
AggregateFunctionPtr getAggregateFunction() const { return func; }
/// Take shared ownership of Arena, that holds memory for states of aggregate functions.
void addArena(ArenaPtr arena_);
void addArena(ConstArenaPtr arena_);
/** Transform column with states of aggregate functions to column with final result values.
*/

View File

@ -108,13 +108,92 @@ class AllocatorWithHint : Hint
{
protected:
static constexpr bool clear_memory = clear_memory_;
static constexpr size_t small_memory_threshold = mmap_threshold;
public:
/// Allocate memory range.
void * alloc(size_t size, size_t alignment = 0)
{
CurrentMemoryTracker::alloc(size);
return allocNoTrack(size, alignment);
}
/// Free memory range.
void free(void * buf, size_t size)
{
freeNoTrack(buf, size);
CurrentMemoryTracker::free(size);
}
/** Enlarge memory range.
* Data from old range is moved to the beginning of new range.
* Address of memory range could change.
*/
void * realloc(void * buf, size_t old_size, size_t new_size, size_t alignment = 0)
{
if (old_size == new_size)
{
/// nothing to do.
/// BTW, it's not possible to change alignment while doing realloc.
}
else if (old_size < mmap_threshold && new_size < mmap_threshold && alignment <= MALLOC_MIN_ALIGNMENT)
{
/// Resize malloc'd memory region with no special alignment requirement.
CurrentMemoryTracker::realloc(old_size, new_size);
void * new_buf = ::realloc(buf, new_size);
if (nullptr == new_buf)
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
buf = new_buf;
if constexpr (clear_memory)
if (new_size > old_size)
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
}
else if (old_size >= mmap_threshold && new_size >= mmap_threshold)
{
/// Resize mmap'd memory region.
CurrentMemoryTracker::realloc(old_size, new_size);
// On apple and freebsd self-implemented mremap used (common/mremap.h)
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP);
/// No need for zero-fill, because mmap guarantees it.
}
else if (new_size < small_memory_threshold)
{
/// Small allocs that requires a copy. Assume there's enough memory in system. Call CurrentMemoryTracker once.
CurrentMemoryTracker::realloc(old_size, new_size);
void * new_buf = allocNoTrack(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
freeNoTrack(buf, old_size);
buf = new_buf;
}
else
{
/// Big allocs that requires a copy. MemoryTracker is called inside 'alloc', 'free' methods.
void * new_buf = alloc(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
free(buf, old_size);
buf = new_buf;
}
return buf;
}
protected:
static constexpr size_t getStackThreshold()
{
return 0;
}
private:
void * allocNoTrack(size_t size, size_t alignment)
{
void * buf;
if (size >= mmap_threshold)
@ -149,15 +228,14 @@ public:
if (0 != res)
DB::throwFromErrno("Cannot allocate memory (posix_memalign) " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
if (clear_memory)
if constexpr (clear_memory)
memset(buf, 0, size);
}
}
return buf;
}
/// Free memory range.
void free(void * buf, size_t size)
void freeNoTrack(void * buf, size_t size)
{
if (size >= mmap_threshold)
{
@ -168,63 +246,6 @@ public:
{
::free(buf);
}
CurrentMemoryTracker::free(size);
}
/** Enlarge memory range.
* Data from old range is moved to the beginning of new range.
* Address of memory range could change.
*/
void * realloc(void * buf, size_t old_size, size_t new_size, size_t alignment = 0)
{
if (old_size == new_size)
{
/// nothing to do.
/// BTW, it's not possible to change alignment while doing realloc.
}
else if (old_size < mmap_threshold && new_size < mmap_threshold && alignment <= MALLOC_MIN_ALIGNMENT)
{
/// Resize malloc'd memory region with no special alignment requirement.
CurrentMemoryTracker::realloc(old_size, new_size);
void * new_buf = ::realloc(buf, new_size);
if (nullptr == new_buf)
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
buf = new_buf;
if (clear_memory && new_size > old_size)
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
}
else if (old_size >= mmap_threshold && new_size >= mmap_threshold)
{
/// Resize mmap'd memory region.
CurrentMemoryTracker::realloc(old_size, new_size);
// On apple and freebsd self-implemented mremap used (common/mremap.h)
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP);
/// No need for zero-fill, because mmap guarantees it.
}
else
{
/// All other cases that requires a copy. MemoryTracker is called inside 'alloc', 'free' methods.
void * new_buf = alloc(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
free(buf, old_size);
buf = new_buf;
}
return buf;
}
protected:
static constexpr size_t getStackThreshold()
{
return 0;
}
};
@ -267,7 +288,7 @@ public:
{
if (size <= N)
{
if (Base::clear_memory)
if constexpr (Base::clear_memory)
memset(stack_memory, 0, N);
return stack_memory;
}

View File

@ -3,7 +3,6 @@
#include <Common/HashTable/SmallTable.h>
#include <Common/HashTable/HashSet.h>
#include <Common/HyperLogLogCounter.h>
#include <Common/MemoryTracker.h>
#include <Core/Defines.h>
@ -230,7 +229,6 @@ private:
if (getContainerType() != details::ContainerType::SMALL)
throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR);
CurrentMemoryTracker::alloc(sizeof(Medium));
auto tmp_medium = std::make_unique<Medium>();
for (const auto & x : small)
@ -247,7 +245,6 @@ private:
if ((container_type != details::ContainerType::SMALL) && (container_type != details::ContainerType::MEDIUM))
throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR);
CurrentMemoryTracker::alloc(sizeof(Large));
auto tmp_large = std::make_unique<Large>();
if (container_type == details::ContainerType::SMALL)
@ -277,15 +274,11 @@ private:
{
delete medium;
medium = nullptr;
CurrentMemoryTracker::free(sizeof(Medium));
}
else if (container_type == details::ContainerType::LARGE)
{
delete large;
large = nullptr;
CurrentMemoryTracker::free(sizeof(Large));
}
}

View File

@ -46,6 +46,12 @@ MemoryTracker * CurrentThread::getMemoryTracker()
return &current_thread->memory_tracker;
}
Int64 & CurrentThread::getUntrackedMemory()
{
/// It assumes that (current_thread != nullptr) is already checked with getMemoryTracker()
return current_thread->untracked_memory;
}
void CurrentThread::updateProgressIn(const Progress & value)
{
if (unlikely(!current_thread))

View File

@ -3,6 +3,7 @@
#include <memory>
#include <string>
#include <common/likely.h>
#include <common/StringRef.h>
#include <Common/ThreadStatus.h>
@ -48,6 +49,7 @@ public:
static ProfileEvents::Counters & getProfileEvents();
static MemoryTracker * getMemoryTracker();
static Int64 & getUntrackedMemory();
/// Update read and write rows (bytes) statistics (used in system.query_thread_log)
static void updateProgressIn(const Progress & value);
@ -71,7 +73,12 @@ public:
static void finalizePerformanceCounters();
/// Returns a non-empty string if the thread is attached to a query
static StringRef getQueryId();
static StringRef getQueryId()
{
if (unlikely(!current_thread))
return {};
return current_thread->getQueryId();
}
/// Non-master threads call this method in destructor automatically
static void detachQuery();

View File

@ -434,6 +434,9 @@ namespace ErrorCodes
extern const int BAD_QUERY_PARAMETER = 457;
extern const int CANNOT_UNLINK = 458;
extern const int CANNOT_SET_THREAD_PRIORITY = 459;
extern const int CANNOT_CREATE_TIMER = 460;
extern const int CANNOT_SET_TIMER_PERIOD = 461;
extern const int CANNOT_DELETE_TIMER = 462;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -4,7 +4,6 @@
#include <Common/HyperLogLogCounter.h>
#include <Common/HashTable/SmallTable.h>
#include <Common/MemoryTracker.h>
namespace DB
@ -39,8 +38,6 @@ private:
void toLarge()
{
CurrentMemoryTracker::alloc(sizeof(Large));
/// At the time of copying data from `tiny`, setting the value of `large` is still not possible (otherwise it will overwrite some data).
Large * tmp_large = new Large;
@ -56,11 +53,7 @@ public:
~HyperLogLogWithSmallSetOptimization()
{
if (isLarge())
{
delete large;
CurrentMemoryTracker::free(sizeof(Large));
}
}
void insert(Key value)

View File

@ -1,3 +1,5 @@
#include <cstdlib>
#include "MemoryTracker.h"
#include <common/likely.h>
#include <common/logger_useful.h>
@ -17,6 +19,8 @@ namespace DB
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
/// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters.
static constexpr Int64 untracked_memory_limit = 4 * 1024 * 1024;
MemoryTracker::~MemoryTracker()
@ -85,6 +89,9 @@ void MemoryTracker::alloc(Int64 size)
{
free(size);
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
auto untrack_lock = blocker.cancel();
std::stringstream message;
message << "Memory tracker";
if (description)
@ -100,6 +107,9 @@ void MemoryTracker::alloc(Int64 size)
{
free(size);
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
auto untrack_lock = blocker.cancel();
std::stringstream message;
message << "Memory limit";
if (description)
@ -191,19 +201,41 @@ namespace CurrentMemoryTracker
void alloc(Int64 size)
{
if (auto memory_tracker = DB::CurrentThread::getMemoryTracker())
memory_tracker->alloc(size);
{
Int64 & untracked = DB::CurrentThread::getUntrackedMemory();
untracked += size;
if (untracked > untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be usefull for enlarge Exception message in rethrow logic.
Int64 tmp = untracked;
untracked = 0;
memory_tracker->alloc(tmp);
}
}
}
void realloc(Int64 old_size, Int64 new_size)
{
if (auto memory_tracker = DB::CurrentThread::getMemoryTracker())
memory_tracker->alloc(new_size - old_size);
Int64 addition = new_size - old_size;
if (addition > 0)
alloc(addition);
else
free(-addition);
}
void free(Int64 size)
{
if (auto memory_tracker = DB::CurrentThread::getMemoryTracker())
memory_tracker->free(size);
{
Int64 & untracked = DB::CurrentThread::getUntrackedMemory();
untracked -= size;
if (untracked < -untracked_memory_limit)
{
memory_tracker->free(-untracked);
untracked = 0;
}
}
}
}

View File

@ -45,7 +45,11 @@ public:
void realloc(Int64 old_size, Int64 new_size)
{
alloc(new_size - old_size);
Int64 addition = new_size - old_size;
if (addition > 0)
alloc(addition);
else
free(-addition);
}
/** This function should be called after memory deallocation.

View File

@ -0,0 +1,134 @@
#include "QueryProfiler.h"
#include <common/Pipe.h>
#include <common/StackTrace.h>
#include <common/StringRef.h>
#include <common/logger_useful.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
namespace DB
{
extern LazyPipe trace_pipe;
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr size_t QUERY_ID_MAX_LEN = 1024;
void writeTraceInfo(TimerType timer_type, int /* sig */, siginfo_t * /* info */, void * context)
{
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(StackTrace) + // collected stack trace
sizeof(TimerType); // timer type
char buffer[buf_size];
WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
const auto signal_context = *reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(signal_context);
writeChar(false, out);
writeStringBinary(query_id, out);
writePODBinary(stack_trace, out);
writePODBinary(timer_type, out);
out.next();
}
const UInt32 TIMER_PRECISION = 1e9;
}
namespace ErrorCodes
{
extern const int CANNOT_MANIPULATE_SIGSET;
extern const int CANNOT_SET_SIGNAL_HANDLER;
extern const int CANNOT_CREATE_TIMER;
extern const int CANNOT_SET_TIMER_PERIOD;
extern const int CANNOT_DELETE_TIMER;
}
template <typename ProfilerImpl>
QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const Int32 thread_id, const int clock_type, const UInt32 period, const int pause_signal)
: log(&Logger::get("QueryProfiler"))
, pause_signal(pause_signal)
{
struct sigaction sa{};
sa.sa_sigaction = ProfilerImpl::signalHandler;
sa.sa_flags = SA_SIGINFO | SA_RESTART;
if (sigemptyset(&sa.sa_mask))
throwFromErrno("Failed to clean signal mask for query profiler", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
if (sigaddset(&sa.sa_mask, pause_signal))
throwFromErrno("Failed to add signal to mask for query profiler", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
if (sigaction(pause_signal, &sa, previous_handler))
throwFromErrno("Failed to setup signal handler for query profiler", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
try
{
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD_ID;
sev.sigev_signo = pause_signal;
sev._sigev_un._tid = thread_id;
if (timer_create(clock_type, &sev, &timer_id))
throwFromErrno("Failed to create thread timer", ErrorCodes::CANNOT_CREATE_TIMER);
struct timespec interval{.tv_sec = period / TIMER_PRECISION, .tv_nsec = period % TIMER_PRECISION};
struct itimerspec timer_spec = {.it_interval = interval, .it_value = interval};
if (timer_settime(timer_id, 0, &timer_spec, nullptr))
throwFromErrno("Failed to set thread timer period", ErrorCodes::CANNOT_SET_TIMER_PERIOD);
}
catch (...)
{
tryCleanup();
throw;
}
}
template <typename ProfilerImpl>
QueryProfilerBase<ProfilerImpl>::~QueryProfilerBase()
{
tryCleanup();
}
template <typename ProfilerImpl>
void QueryProfilerBase<ProfilerImpl>::tryCleanup()
{
if (timer_id != nullptr && timer_delete(timer_id))
LOG_ERROR(log, "Failed to delete query profiler timer " + errnoToString(ErrorCodes::CANNOT_DELETE_TIMER));
if (previous_handler != nullptr && sigaction(pause_signal, previous_handler, nullptr))
LOG_ERROR(log, "Failed to restore signal handler after query profiler " + errnoToString(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER));
}
template class QueryProfilerBase<QueryProfilerReal>;
template class QueryProfilerBase<QueryProfilerCpu>;
QueryProfilerReal::QueryProfilerReal(const Int32 thread_id, const UInt32 period)
: QueryProfilerBase(thread_id, CLOCK_REALTIME, period, SIGUSR1)
{}
void QueryProfilerReal::signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TimerType::Real, sig, info, context);
}
QueryProfilerCpu::QueryProfilerCpu(const Int32 thread_id, const UInt32 period)
: QueryProfilerBase(thread_id, CLOCK_THREAD_CPUTIME_ID, period, SIGUSR2)
{}
void QueryProfilerCpu::signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TimerType::Cpu, sig, info, context);
}
}

View File

@ -0,0 +1,74 @@
#pragma once
#include <Core/Types.h>
#include <signal.h>
#include <time.h>
namespace Poco
{
class Logger;
}
namespace DB
{
enum class TimerType : UInt8
{
Real,
Cpu,
};
/**
* Query profiler implementation for selected thread.
*
* This class installs timer and signal handler on creation to:
* 1. periodically pause given thread
* 2. collect thread's current stack trace
* 3. write collected stack trace to trace_pipe for TraceCollector
*
* Desctructor tries to unset timer and restore previous signal handler.
* Note that signal handler implementation is defined by template parameter. See QueryProfilerReal and QueryProfilerCpu.
*/
template <typename ProfilerImpl>
class QueryProfilerBase
{
public:
QueryProfilerBase(const Int32 thread_id, const int clock_type, const UInt32 period, const int pause_signal = SIGALRM);
~QueryProfilerBase();
private:
void tryCleanup();
Poco::Logger * log;
/// Timer id from timer_create(2)
timer_t timer_id = nullptr;
/// Pause signal to interrupt threads to get traces
int pause_signal;
/// Previous signal handler to restore after query profiler exits
struct sigaction * previous_handler = nullptr;
};
/// Query profiler with timer based on real clock
class QueryProfilerReal : public QueryProfilerBase<QueryProfilerReal>
{
public:
QueryProfilerReal(const Int32 thread_id, const UInt32 period);
static void signalHandler(int sig, siginfo_t * info, void * context);
};
/// Query profiler with timer based on CPU clock
class QueryProfilerCpu : public QueryProfilerBase<QueryProfilerCpu>
{
public:
QueryProfilerCpu(const Int32 thread_id, const UInt32 period);
static void signalHandler(int sig, siginfo_t * info, void * context);
};
}

View File

@ -8,52 +8,10 @@
#include <IO/WriteHelpers.h>
#include <port/unistd.h>
#include <csignal>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PIPE;
extern const int CANNOT_DLSYM;
extern const int CANNOT_FORK;
extern const int CANNOT_WAITPID;
extern const int CHILD_WAS_NOT_EXITED_NORMALLY;
extern const int CANNOT_CREATE_CHILD_PROCESS;
}
}
#include <common/Pipe.h>
namespace
{
struct Pipe
{
int fds_rw[2];
Pipe()
{
#ifndef __APPLE__
if (0 != pipe2(fds_rw, O_CLOEXEC))
DB::throwFromErrno("Cannot create pipe", DB::ErrorCodes::CANNOT_PIPE);
#else
if (0 != pipe(fds_rw))
DB::throwFromErrno("Cannot create pipe", DB::ErrorCodes::CANNOT_PIPE);
if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC))
DB::throwFromErrno("Cannot create pipe", DB::ErrorCodes::CANNOT_PIPE);
if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC))
DB::throwFromErrno("Cannot create pipe", DB::ErrorCodes::CANNOT_PIPE);
#endif
}
~Pipe()
{
if (fds_rw[0] >= 0)
close(fds_rw[0]);
if (fds_rw[1] >= 0)
close(fds_rw[1]);
}
};
/// By these return codes from the child process, we learn (for sure) about errors when creating it.
enum class ReturnCodes : int
{
@ -64,10 +22,18 @@ namespace
};
}
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_DLSYM;
extern const int CANNOT_FORK;
extern const int CANNOT_WAITPID;
extern const int CHILD_WAS_NOT_EXITED_NORMALLY;
extern const int CANNOT_CREATE_CHILD_PROCESS;
}
ShellCommand::ShellCommand(pid_t pid, int in_fd, int out_fd, int err_fd, bool terminate_in_destructor_)
: pid(pid)
, terminate_in_destructor(terminate_in_destructor_)

View File

@ -34,6 +34,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
// Replace NLMSG_OK with explicit casts since that system macro contains signedness bugs which are not going to be fixed.
static inline bool is_nlmsg_ok(const struct nlmsghdr * const nlh, const ssize_t len)
{
return len >= static_cast<ssize_t>(sizeof(*nlh)) && nlh->nlmsg_len >= sizeof(*nlh) && static_cast<size_t>(len) >= nlh->nlmsg_len;
}
namespace
{
@ -128,7 +133,7 @@ struct NetlinkMessage
if (header.nlmsg_type == NLMSG_ERROR)
throw Exception("Can't receive Netlink response: error " + std::to_string(error.error), ErrorCodes::NETLINK_ERROR);
if (!NLMSG_OK((&header), bytes_received))
if (!is_nlmsg_ok(&header, bytes_received))
throw Exception("Can't receive Netlink response: wrong number of bytes received", ErrorCodes::NETLINK_ERROR);
}
};

View File

@ -4,6 +4,7 @@
#include <Common/Exception.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/TaskStatsInfoGetter.h>
#include <Common/QueryProfiler.h>
#include <Common/ThreadStatus.h>
#include <Poco/Logger.h>
@ -50,6 +51,19 @@ ThreadStatus::ThreadStatus()
ThreadStatus::~ThreadStatus()
{
try
{
if (untracked_memory > 0)
memory_tracker.alloc(untracked_memory);
else
memory_tracker.free(-untracked_memory);
}
catch (const DB::Exception &)
{
/// It's a minor tracked memory leak here (not the memory itself but it's counter).
/// We've already allocated a little bit more then the limit and cannot track it in the thread memory tracker or its parent.
}
if (deleter)
deleter();
current_thread = nullptr;

View File

@ -28,6 +28,8 @@ namespace DB
class Context;
class QueryStatus;
class ThreadStatus;
class QueryProfilerReal;
class QueryProfilerCpu;
class QueryThreadLog;
struct TasksStatsCounters;
struct RUsageCounters;
@ -96,6 +98,8 @@ public:
/// TODO: merge them into common entity
ProfileEvents::Counters performance_counters{VariableContext::Thread};
MemoryTracker memory_tracker{VariableContext::Thread};
/// Small amount of untracked memory (per thread atomic-less counter)
Int64 untracked_memory = 0;
/// Statistics of read and write rows/bytes
Progress progress_in;
@ -121,7 +125,10 @@ public:
return thread_state.load(std::memory_order_relaxed);
}
StringRef getQueryId() const;
StringRef getQueryId() const
{
return query_id;
}
/// Starts new query and create new thread group for it, current thread becomes master thread of the query
void initializeQuery();
@ -153,6 +160,10 @@ public:
protected:
void initPerformanceCounters();
void initQueryProfiler();
void finalizeQueryProfiler();
void logToQueryThreadLog(QueryThreadLog & thread_log);
void assertState(const std::initializer_list<int> & permitted_states, const char * description = nullptr);
@ -176,6 +187,10 @@ protected:
time_t query_start_time = 0;
size_t queries_started = 0;
// CPU and Real time query profilers
std::unique_ptr<QueryProfilerReal> query_profiler_real;
std::unique_ptr<QueryProfilerCpu> query_profiler_cpu;
Poco::Logger * log = nullptr;
friend class CurrentThread;

View File

@ -6,6 +6,7 @@
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <common/sleep.h>
#include <IO/WriteHelpers.h>
#include <port/clock.h>
@ -76,12 +77,7 @@ public:
if (desired_ns > elapsed_ns)
{
UInt64 sleep_ns = desired_ns - elapsed_ns;
::timespec sleep_ts;
sleep_ts.tv_sec = sleep_ns / 1000000000;
sleep_ts.tv_nsec = sleep_ns % 1000000000;
/// NOTE: Returns early in case of a signal. This is considered normal.
::nanosleep(&sleep_ts, nullptr);
sleepForNanoseconds(sleep_ns);
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_ns / 1000UL);
}

View File

@ -0,0 +1,100 @@
#include "TraceCollector.h"
#include <Core/Field.h>
#include <Poco/Logger.h>
#include <common/Pipe.h>
#include <common/StackTrace.h>
#include <common/logger_useful.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <Common/Exception.h>
#include <Interpreters/TraceLog.h>
namespace DB
{
LazyPipe trace_pipe;
namespace ErrorCodes
{
extern const int NULL_POINTER_DEREFERENCE;
extern const int THREAD_IS_NOT_JOINABLE;
}
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log)
: log(&Poco::Logger::get("TraceCollector"))
, trace_log(trace_log)
{
if (trace_log == nullptr)
throw Exception("Invalid trace log pointer passed", ErrorCodes::NULL_POINTER_DEREFERENCE);
trace_pipe.open();
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}
TraceCollector::~TraceCollector()
{
if (!thread.joinable())
LOG_ERROR(log, "TraceCollector thread is malformed and cannot be joined");
else
{
TraceCollector::notifyToStop();
thread.join();
}
trace_pipe.close();
}
/**
* Sends TraceCollector stop message
*
* Each sequence of data for TraceCollector thread starts with a boolean flag.
* If this flag is true, TraceCollector must stop reading trace_pipe and exit.
* This function sends flag with a true value to stop TraceCollector gracefully.
*
* NOTE: TraceCollector will NOT stop immediately as there may be some data left in the pipe
* before stop message.
*/
void TraceCollector::notifyToStop()
{
WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1]);
writeChar(true, out);
out.next();
}
void TraceCollector::run()
{
ReadBufferFromFileDescriptor in(trace_pipe.fds_rw[0]);
while (true)
{
char is_last;
readChar(is_last, in);
if (is_last)
break;
std::string query_id;
StackTrace stack_trace(NoCapture{});
TimerType timer_type;
readStringBinary(query_id, in);
readPODBinary(stack_trace, in);
readPODBinary(timer_type, in);
const auto size = stack_trace.getSize();
const auto & frames = stack_trace.getFrames();
Array trace;
trace.reserve(size);
for (size_t i = 0; i < size; i++)
trace.emplace_back(UInt64(reinterpret_cast<uintptr_t>(frames[i])));
TraceLogElement element{std::time(nullptr), timer_type, query_id, trace};
trace_log->add(element);
}
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Common/ThreadPool.h>
namespace Poco
{
class Logger;
}
namespace DB
{
class TraceLog;
class TraceCollector
{
private:
Poco::Logger * log;
std::shared_ptr<TraceLog> trace_log;
ThreadFromGlobalPool thread;
void run();
static void notifyToStop();
public:
TraceCollector(std::shared_ptr<TraceLog> & trace_log);
~TraceCollector();
};
}

View File

@ -0,0 +1,143 @@
#include <new>
#include <common/config_common.h>
#include <common/memory.h>
#include <Common/MemoryTracker.h>
/// Replace default new/delete with memory tracking versions.
/// @sa https://en.cppreference.com/w/cpp/memory/new/operator_new
/// https://en.cppreference.com/w/cpp/memory/new/operator_delete
#if NOT_UNBUNDLED
namespace Memory
{
ALWAYS_INLINE void trackMemory(std::size_t size)
{
#if USE_JEMALLOC
/// The nallocx() function allocates no memory, but it performs the same size computation as the mallocx() function
/// @note je_mallocx() != je_malloc(). It's expected they don't differ much in allocation logic.
if (likely(size != 0))
CurrentMemoryTracker::alloc(nallocx(size, 0));
#else
CurrentMemoryTracker::alloc(size);
#endif
}
ALWAYS_INLINE bool trackMemoryNoExept(std::size_t size) noexcept
{
try
{
trackMemory(size);
}
catch (...)
{
return false;
}
return true;
}
ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [[maybe_unused]] = 0) noexcept
{
try
{
#if USE_JEMALLOC
/// @note It's also possible to use je_malloc_usable_size() here.
if (likely(ptr != nullptr))
CurrentMemoryTracker::free(sallocx(ptr, 0));
#else
if (size)
CurrentMemoryTracker::free(size);
#endif
}
catch (...)
{}
}
}
/// new
void * operator new(std::size_t size)
{
Memory::trackMemory(size);
return Memory::newImpl(size);
}
void * operator new[](std::size_t size)
{
Memory::trackMemory(size);
return Memory::newImpl(size);
}
void * operator new(std::size_t size, const std::nothrow_t &) noexcept
{
if (likely(Memory::trackMemoryNoExept(size)))
return Memory::newNoExept(size);
return nullptr;
}
void * operator new[](std::size_t size, const std::nothrow_t &) noexcept
{
if (likely(Memory::trackMemoryNoExept(size)))
return Memory::newNoExept(size);
return nullptr;
}
/// delete
/// C++17 std 21.6.2.1 (11)
/// If a function without a size parameter is defined, the program should also define the corresponding function with a size parameter.
/// If a function with a size parameter is defined, the program shall also define the corresponding version without the size parameter.
/// cppreference:
/// It's unspecified whether size-aware or size-unaware version is called when deleting objects of
/// incomplete type and arrays of non-class and trivially-destructible class types.
void operator delete(void * ptr) noexcept
{
Memory::untrackMemory(ptr);
Memory::deleteImpl(ptr);
}
void operator delete[](void * ptr) noexcept
{
Memory::untrackMemory(ptr);
Memory::deleteImpl(ptr);
}
void operator delete(void * ptr, std::size_t size) noexcept
{
Memory::untrackMemory(ptr, size);
Memory::deleteSized(ptr, size);
}
void operator delete[](void * ptr, std::size_t size) noexcept
{
Memory::untrackMemory(ptr, size);
Memory::deleteSized(ptr, size);
}
#else
/// new
void * operator new(std::size_t size) { return Memory::newImpl(size); }
void * operator new[](std::size_t size) { return Memory::newImpl(size); }
void * operator new(std::size_t size, const std::nothrow_t &) noexcept { return Memory::newNoExept(size); }
void * operator new[](std::size_t size, const std::nothrow_t &) noexcept { return Memory::newNoExept(size); }
/// delete
void operator delete(void * ptr) noexcept { Memory::deleteImpl(ptr); }
void operator delete[](void * ptr) noexcept { Memory::deleteImpl(ptr); }
void operator delete(void * ptr, const std::nothrow_t &) noexcept { Memory::deleteImpl(ptr); }
void operator delete[](void * ptr, const std::nothrow_t &) noexcept { Memory::deleteImpl(ptr); }
void operator delete(void * ptr, std::size_t size) noexcept { Memory::deleteSized(ptr, size); }
void operator delete[](void * ptr, std::size_t size) noexcept { Memory::deleteSized(ptr, size); }
#endif

View File

@ -139,3 +139,7 @@
/// This number is only used for distributed version compatible.
/// It could be any magic number.
#define DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER 0xCAFECABE
/// A macro for suppressing warnings about unused variables or function results.
/// Useful for structured bindings which have no standard way to declare this.
#define UNUSED(X) (void) (X)

View File

@ -170,7 +170,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, input_format_skip_unknown_fields, false, "Skip columns with unknown names from input data (it works for JSONEachRow, CSVWithNames, TSVWithNames and TSKV formats).") \
M(SettingBool, input_format_with_names_use_header, false, "For TSVWithNames and CSVWithNames input formats this controls whether format parser is to assume that column data appear in the input exactly as they are specified in the header.") \
M(SettingBool, input_format_import_nested_json, false, "Map nested JSON data to nested tables (it works for JSONEachRow format).") \
M(SettingBool, input_format_defaults_for_omitted_fields, false, "For input data calculate default expressions for omitted fields (it works for JSONEachRow format).") \
M(SettingBool, input_format_defaults_for_omitted_fields, true, "For input data calculate default expressions for omitted fields (it works for JSONEachRow format).") \
\
M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.") \
\
@ -221,6 +221,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") \
M(SettingBool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.") \
M(SettingUInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.") \
M(SettingUInt64, query_profiler_real_time_period_ns, 0, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off real clock query profiler") \
M(SettingUInt64, query_profiler_cpu_time_period_ns, 0, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off CPU clock query profiler") \
\
\
/** Limits during query execution are part of the settings. \
@ -332,7 +334,11 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, allow_simdjson, true, "Allow using simdjson library in 'JSON*' functions if AVX2 instructions are available. If disabled rapidjson will be used.") \
\
M(SettingUInt64, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.") \
M(SettingBool, check_query_single_value_result, true, "Return check query result as single 1/0 value")
M(SettingBool, check_query_single_value_result, true, "Return check query result as single 1/0 value") \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\
M(SettingBool, allow_experimental_low_cardinality_type, true, "Obsolete setting, does nothing. Will be removed after 2019-08-13")
DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS)

View File

@ -3,7 +3,7 @@
#include <Interpreters/ProcessList.h>
#include <Interpreters/Quota.h>
#include <Common/CurrentThread.h>
#include <common/sleep.h>
namespace ProfileEvents
{
@ -255,13 +255,7 @@ static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_i
if (desired_microseconds > total_elapsed_microseconds)
{
UInt64 sleep_microseconds = desired_microseconds - total_elapsed_microseconds;
::timespec sleep_ts;
sleep_ts.tv_sec = sleep_microseconds / 1000000;
sleep_ts.tv_nsec = sleep_microseconds % 1000000 * 1000;
/// NOTE: Returns early in case of a signal. This is considered normal.
/// NOTE: It's worth noting that this behavior affects kill of queries.
::nanosleep(&sleep_ts, nullptr);
sleepForMicroseconds(sleep_microseconds);
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_microseconds);
}

View File

@ -51,7 +51,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
res_stream = context.getInputFormat(format, *input_buffer_contacenated, header, context.getSettings().max_insert_block_size);
if (context.getSettingsRef().input_format_defaults_for_omitted_fields)
if (context.getSettingsRef().input_format_defaults_for_omitted_fields && !ast_insert_query->table.empty())
{
StoragePtr storage = context.getTable(ast_insert_query->database, ast_insert_query->table);
auto column_defaults = storage->getColumns().getDefaults();

View File

@ -50,6 +50,11 @@ RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_,
if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
log_queue->pushBlock(std::move(packet.block));
}
else if (Protocol::Server::TableColumns == packet.type)
{
/// Server could attach ColumnsDescription in front of stream for column defaults. There's no need to pass it through cause
/// client's already got this information for remote table. Ignore.
}
else
throw NetException("Unexpected packet from server (expected Data or Exception, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);

View File

@ -40,8 +40,10 @@ TTLBlockInputStream::TTLBlockInputStream(
auto it = column_defaults.find(name);
if (it != column_defaults.end())
default_expr_list->children.emplace_back(
setAlias(it->second.expression, it->first));
{
auto expression = it->second.expression->clone();
default_expr_list->children.emplace_back(setAlias(expression, it->first));
}
}
else
new_ttl_infos.columns_ttl.emplace(name, ttl_info);

View File

@ -24,32 +24,15 @@ TotalsHavingBlockInputStream::TotalsHavingBlockInputStream(
/// Initialize current totals with initial state.
arena = std::make_shared<Arena>();
Block source_header = children.at(0)->getHeader();
current_totals.reserve(source_header.columns());
for (const auto & elem : source_header)
{
if (const ColumnAggregateFunction * column = typeid_cast<const ColumnAggregateFunction *>(elem.column.get()))
{
/// Create ColumnAggregateFunction with initial aggregate function state.
IAggregateFunction * function = column->getAggregateFunction().get();
auto target = ColumnAggregateFunction::create(column->getAggregateFunction(), Arenas(1, arena));
AggregateDataPtr data = arena->alignedAlloc(function->sizeOfData(), function->alignOfData());
function->create(data);
target->getData().push_back(data);
current_totals.emplace_back(std::move(target));
}
else
{
/// Not an aggregate function state. Just create a column with default value.
MutableColumnPtr new_column = elem.type->createColumn();
elem.type->insertDefaultInto(*new_column);
current_totals.emplace_back(std::move(new_column));
}
// Create a column with default value
MutableColumnPtr new_column = elem.type->createColumn();
elem.type->insertDefaultInto(*new_column);
current_totals.emplace_back(std::move(new_column));
}
}
@ -161,34 +144,35 @@ Block TotalsHavingBlockInputStream::readImpl()
}
void TotalsHavingBlockInputStream::addToTotals(const Block & block, const IColumn::Filter * filter)
void TotalsHavingBlockInputStream::addToTotals(const Block & source_block, const IColumn::Filter * filter)
{
for (size_t i = 0, num_columns = block.columns(); i < num_columns; ++i)
for (size_t i = 0, num_columns = source_block.columns(); i < num_columns; ++i)
{
const ColumnWithTypeAndName & current = block.getByPosition(i);
if (const ColumnAggregateFunction * column = typeid_cast<const ColumnAggregateFunction *>(current.column.get()))
const auto * source_column = typeid_cast<const ColumnAggregateFunction *>(
source_block.getByPosition(i).column.get());
if (!source_column)
{
auto & target = typeid_cast<ColumnAggregateFunction &>(*current_totals[i]);
IAggregateFunction * function = target.getAggregateFunction().get();
AggregateDataPtr data = target.getData()[0];
continue;
}
/// Accumulate all aggregate states into that value.
auto & totals_column = static_cast<ColumnAggregateFunction &>(*current_totals[i]);
assert(totals_column.size() == 1);
const ColumnAggregateFunction::Container & vec = column->getData();
size_t size = vec.size();
/// Accumulate all aggregate states from a column of a source block into
/// the corresponding totals column.
const auto & vec = source_column->getData();
size_t size = vec.size();
if (filter)
{
for (size_t j = 0; j < size; ++j)
if ((*filter)[j])
function->merge(data, vec[j], arena.get());
}
else
{
for (size_t j = 0; j < size; ++j)
function->merge(data, vec[j], arena.get());
}
if (filter)
{
for (size_t j = 0; j < size; ++j)
if ((*filter)[j])
totals_column.insertMergeFrom(vec[j]);
}
else
{
for (size_t j = 0; j < size; ++j)
totals_column.insertMergeFrom(vec[j]);
}
}
}

View File

@ -54,8 +54,6 @@ private:
/// Here, total values are accumulated. After the work is finished, they will be placed in IBlockInputStream::totals.
MutableColumns current_totals;
/// Arena for aggregate function states in totals.
ArenaPtr arena;
/// If filter == nullptr - add all rows. Otherwise, only the rows that pass the filter (HAVING).
void addToTotals(const Block & block, const IColumn::Filter * filter);

View File

@ -27,7 +27,7 @@ DatabaseDictionary::DatabaseDictionary(const String & name_)
{
}
void DatabaseDictionary::loadTables(Context &, ThreadPool *, bool)
void DatabaseDictionary::loadTables(Context &, bool)
{
}

View File

@ -33,7 +33,6 @@ public:
void loadTables(
Context & context,
ThreadPool * thread_pool,
bool has_force_restore_data_flag) override;
bool isTableExist(

View File

@ -18,7 +18,6 @@ DatabaseMemory::DatabaseMemory(String name_)
void DatabaseMemory::loadTables(
Context & /*context*/,
ThreadPool * /*thread_pool*/,
bool /*has_force_restore_data_flag*/)
{
/// Nothing to load.

View File

@ -25,7 +25,6 @@ public:
void loadTables(
Context & context,
ThreadPool * thread_pool,
bool has_force_restore_data_flag) override;
void createTable(

View File

@ -46,7 +46,7 @@ public:
throw Exception("MySQL database engine does not support detach table.", ErrorCodes::NOT_IMPLEMENTED);
}
void loadTables(Context &, ThreadPool *, bool) override
void loadTables(Context &, bool) override
{
/// do nothing
}

View File

@ -119,7 +119,6 @@ DatabaseOrdinary::DatabaseOrdinary(String name_, const String & metadata_path_,
void DatabaseOrdinary::loadTables(
Context & context,
ThreadPool * thread_pool,
bool has_force_restore_data_flag)
{
using FileNames = std::vector<std::string>;
@ -161,96 +160,68 @@ void DatabaseOrdinary::loadTables(
*/
std::sort(file_names.begin(), file_names.end());
size_t total_tables = file_names.size();
const size_t total_tables = file_names.size();
LOG_INFO(log, "Total " << total_tables << " tables.");
AtomicStopwatch watch;
std::atomic<size_t> tables_processed {0};
Poco::Event all_tables_processed;
ExceptionHandler exception_handler;
auto task_function = [&](const String & table)
auto loadOneTable = [&](const String & table)
{
SCOPE_EXIT(
if (++tables_processed == total_tables)
all_tables_processed.set()
);
loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag);
/// Messages, so that it's not boring to wait for the server to load for a long time.
if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0
if (++tables_processed % PRINT_MESSAGE_EACH_N_TABLES == 0
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
watch.restart();
}
loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag);
};
for (const auto & filename : file_names)
{
auto task = createExceptionHandledJob(std::bind(task_function, filename), exception_handler);
ThreadPool pool(SettingMaxThreads().getAutoValue());
if (thread_pool)
thread_pool->schedule(task);
else
task();
for (const auto & file_name : file_names)
{
pool.schedule([&]() { loadOneTable(file_name); });
}
if (thread_pool)
all_tables_processed.wait();
exception_handler.throwIfException();
pool.wait();
/// After all tables was basically initialized, startup them.
startupTables(thread_pool);
startupTables(pool);
}
void DatabaseOrdinary::startupTables(ThreadPool * thread_pool)
void DatabaseOrdinary::startupTables(ThreadPool & thread_pool)
{
LOG_INFO(log, "Starting up tables.");
AtomicStopwatch watch;
std::atomic<size_t> tables_processed {0};
size_t total_tables = tables.size();
Poco::Event all_tables_processed;
ExceptionHandler exception_handler;
const size_t total_tables = tables.size();
if (!total_tables)
return;
auto task_function = [&](const StoragePtr & table)
{
SCOPE_EXIT(
if (++tables_processed == total_tables)
all_tables_processed.set()
);
AtomicStopwatch watch;
std::atomic<size_t> tables_processed {0};
if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0
auto startupOneTable = [&](const StoragePtr & table)
{
table->startup();
if (++tables_processed % PRINT_MESSAGE_EACH_N_TABLES == 0
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
watch.restart();
}
table->startup();
};
for (const auto & name_storage : tables)
for (const auto & table : tables)
{
auto task = createExceptionHandledJob(std::bind(task_function, name_storage.second), exception_handler);
if (thread_pool)
thread_pool->schedule(task);
else
task();
thread_pool.schedule([&]() { startupOneTable(table.second); });
}
if (thread_pool)
all_tables_processed.wait();
exception_handler.throwIfException();
thread_pool.wait();
}

View File

@ -19,7 +19,6 @@ public:
void loadTables(
Context & context,
ThreadPool * thread_pool,
bool has_force_restore_data_flag) override;
void createTable(
@ -73,7 +72,7 @@ private:
const String data_path;
Poco::Logger * log;
void startupTables(ThreadPool * thread_pool);
void startupTables(ThreadPool & thread_pool);
ASTPtr getCreateTableQueryImpl(const Context & context, const String & table_name, bool throw_on_error) const;
};

View File

@ -9,6 +9,7 @@
#include <Storages/IStorage.h>
#include <Storages/StorageFactory.h>
#include <Common/typeid_cast.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <sstream>
@ -68,6 +69,13 @@ std::pair<String, StoragePtr> createTableFromDefinition(
ast_create_query.attach = true;
ast_create_query.database = database_name;
if (ast_create_query.as_table_function)
{
const auto & table_function = ast_create_query.as_table_function->as<ASTFunction &>();
const auto & factory = TableFunctionFactory::instance();
StoragePtr storage = factory.get(table_function.name, context)->execute(ast_create_query.as_table_function, context, ast_create_query.table);
return {ast_create_query.table, storage};
}
/// We do not directly use `InterpreterCreateQuery::execute`, because
/// - the database has not been created yet;
/// - the code is simpler, since the query is already brought to a suitable form.

View File

@ -56,11 +56,10 @@ public:
/// Get name of database engine.
virtual String getEngineName() const = 0;
/// Load a set of existing tables. If thread_pool is specified, use it.
/// Load a set of existing tables.
/// You can call only once, right after the object is created.
virtual void loadTables(
Context & context,
ThreadPool * thread_pool,
bool has_force_restore_data_flag) = 0;
/// Check the existence of the table.

View File

@ -30,8 +30,6 @@ public:
const DictionaryLifetime dict_lifetime,
const size_t size);
std::exception_ptr getCreationException() const override { return {}; }
std::string getName() const override { return name; }
std::string getTypeName() const override { return "Cache"; }
@ -62,8 +60,6 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;
@ -284,8 +280,6 @@ private:
mutable std::atomic<size_t> element_count{0};
mutable std::atomic<size_t> hit_count{0};
mutable std::atomic<size_t> query_count{0};
const std::chrono::time_point<std::chrono::system_clock> creation_time = std::chrono::system_clock::now();
};
}

View File

@ -115,8 +115,7 @@ std::string ClickHouseDictionarySource::getUpdateFieldAndDate()
else
{
update_time = std::chrono::system_clock::now();
std::string str_time("0000-00-00 00:00:00"); ///for initial load
return query_builder.composeUpdateQuery(update_field, str_time);
return query_builder.composeLoadAllQuery();
}
}

View File

@ -50,8 +50,6 @@ public:
std::string getKeyDescription() const { return key_description; }
std::exception_ptr getCreationException() const override { return {}; }
std::string getName() const override { return name; }
std::string getTypeName() const override { return "ComplexKeyCache"; }
@ -86,8 +84,6 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;

View File

@ -29,18 +29,8 @@ ComplexKeyHashedDictionary::ComplexKeyHashedDictionary(
, saved_block{std::move(saved_block)}
{
createAttributes();
try
{
loadData();
calculateBytesAllocated();
}
catch (...)
{
creation_exception = std::current_exception();
}
creation_time = std::chrono::system_clock::now();
loadData();
calculateBytesAllocated();
}
#define DECLARE(TYPE) \

View File

@ -32,8 +32,6 @@ public:
std::string getKeyDescription() const { return key_description; }
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; }
std::string getTypeName() const override { return "ComplexKeyHashed"; }
@ -61,8 +59,6 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;
@ -255,10 +251,6 @@ private:
size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0};
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
BlockPtr saved_block;
};

View File

@ -167,7 +167,7 @@ std::string ExternalQueryBuilder::composeUpdateQuery(const std::string & update_
else
update_query = " WHERE " + update_field + " >= '" + time_point + "'";
return out.insert(out.size() - 1, update_query); ///This is done to insert "update_query" before "out"'s semicolon
return out.insert(out.size() - 1, update_query); /// This is done to insert "update_query" before "out"'s semicolon
}

View File

@ -35,7 +35,7 @@ struct ExternalQueryBuilder
/** Generate a query to load all data. */
std::string composeLoadAllQuery() const;
/** Generate a query to load data after certain time point*/
/** Generate a query to load data after certain time point */
std::string composeUpdateQuery(const std::string & update_field, const std::string & time_point) const;
/** Generate a query to load data by set of UInt64 keys. */

View File

@ -36,18 +36,8 @@ FlatDictionary::FlatDictionary(
, saved_block{std::move(saved_block)}
{
createAttributes();
try
{
loadData();
calculateBytesAllocated();
}
catch (...)
{
creation_exception = std::current_exception();
}
creation_time = std::chrono::system_clock::now();
loadData();
calculateBytesAllocated();
}

View File

@ -29,8 +29,6 @@ public:
bool require_nonempty,
BlockPtr saved_block = nullptr);
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; }
std::string getTypeName() const override { return "Flat"; }
@ -58,8 +56,6 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;
@ -244,10 +240,6 @@ private:
size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0};
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
BlockPtr saved_block;
};

View File

@ -66,7 +66,6 @@ void HTTPDictionarySource::getUpdateFieldAndDate(Poco::URI & uri)
else
{
update_time = std::chrono::system_clock::now();
uri.addQueryParameter(update_field, "0000-00-00 00:00:00");
}
}

View File

@ -30,18 +30,8 @@ HashedDictionary::HashedDictionary(
, saved_block{std::move(saved_block)}
{
createAttributes();
try
{
loadData();
calculateBytesAllocated();
}
catch (...)
{
creation_exception = std::current_exception();
}
creation_time = std::chrono::system_clock::now();
loadData();
calculateBytesAllocated();
}

View File

@ -28,8 +28,6 @@ public:
bool require_nonempty,
BlockPtr saved_block = nullptr);
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; }
std::string getTypeName() const override { return "Hashed"; }
@ -57,8 +55,6 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;
@ -248,10 +244,6 @@ private:
size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0};
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
BlockPtr saved_block;
};

View File

@ -109,8 +109,7 @@ std::string MySQLDictionarySource::getUpdateFieldAndDate()
else
{
update_time = std::chrono::system_clock::now();
std::string str_time("0000-00-00 00:00:00"); ///for initial load
return query_builder.composeUpdateQuery(update_field, str_time);
return query_builder.composeLoadAllQuery();
}
}

View File

@ -80,18 +80,8 @@ RangeHashedDictionary::RangeHashedDictionary(
, require_nonempty(require_nonempty)
{
createAttributes();
try
{
loadData();
calculateBytesAllocated();
}
catch (...)
{
creation_exception = std::current_exception();
}
creation_time = std::chrono::system_clock::now();
loadData();
calculateBytesAllocated();
}

View File

@ -24,8 +24,6 @@ public:
const DictionaryLifetime dict_lifetime,
bool require_nonempty);
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return dictionary_name; }
std::string getTypeName() const override { return "RangeHashed"; }
@ -53,8 +51,6 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;
@ -227,10 +223,6 @@ private:
size_t element_count = 0;
size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0};
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
};
}

View File

@ -33,8 +33,6 @@ public:
std::string getKeyDescription() const { return key_description; }
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; }
std::string getTypeName() const override { return "Trie"; }
@ -62,8 +60,6 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;

View File

@ -127,8 +127,7 @@ std::string XDBCDictionarySource::getUpdateFieldAndDate()
else
{
update_time = std::chrono::system_clock::now();
std::string str_time("0000-00-00 00:00:00"); ///for initial load
return query_builder.composeUpdateQuery(update_field, str_time);
return query_builder.composeLoadAllQuery();
}
}

View File

@ -41,11 +41,12 @@ namespace
constexpr UInt64 END_OF_GROUP = static_cast<UInt64>(-2);
Int64 decodeZigZag(UInt64 n) { return static_cast<Int64>((n >> 1) ^ (~(n & 1) + 1)); }
}
[[noreturn]] void unknownFormat()
{
throw Exception("Protobuf messages are corrupted or don't match the provided schema. Please note that Protobuf stream is length-delimited: every message is prefixed by its length in varint.", ErrorCodes::UNKNOWN_PROTOBUF_FORMAT);
}
[[noreturn]] void ProtobufReader::SimpleReader::throwUnknownFormat()
{
throw Exception("Protobuf messages are corrupted or don't match the provided schema. Please note that Protobuf stream is length-delimited: every message is prefixed by its length in varint.", ErrorCodes::UNKNOWN_PROTOBUF_FORMAT);
}
@ -67,7 +68,10 @@ bool ProtobufReader::SimpleReader::startMessage()
if (unlikely(in.eof()))
return false;
size_t size_of_message = readVarint();
if (size_of_message == 0)
throwUnknownFormat();
current_message_end = cursor + size_of_message;
root_message_end = current_message_end;
}
else
{
@ -91,7 +95,7 @@ void ProtobufReader::SimpleReader::endMessage()
else if (unlikely(cursor > current_message_end))
{
if (!parent_message_ends.empty())
unknownFormat();
throwUnknownFormat();
moveCursorBackward(cursor - current_message_end);
}
current_message_end = REACHED_END;
@ -141,7 +145,7 @@ bool ProtobufReader::SimpleReader::readFieldNumber(UInt32 & field_number)
UInt64 varint = readVarint();
if (unlikely(varint & (static_cast<UInt64>(0xFFFFFFFF) << 32)))
unknownFormat();
throwUnknownFormat();
UInt32 key = static_cast<UInt32>(varint);
field_number = (key >> 3);
WireType wire_type = static_cast<WireType>(key & 0x07);
@ -171,7 +175,7 @@ bool ProtobufReader::SimpleReader::readFieldNumber(UInt32 & field_number)
case GROUP_END:
{
if (current_message_end != END_OF_GROUP)
unknownFormat();
throwUnknownFormat();
current_message_end = REACHED_END;
return false;
}
@ -181,7 +185,7 @@ bool ProtobufReader::SimpleReader::readFieldNumber(UInt32 & field_number)
return true;
}
}
unknownFormat();
throwUnknownFormat();
__builtin_unreachable();
}
@ -257,7 +261,7 @@ void ProtobufReader::SimpleReader::ignore(UInt64 num_bytes)
void ProtobufReader::SimpleReader::moveCursorBackward(UInt64 num_bytes)
{
if (in.offset() < num_bytes)
unknownFormat();
throwUnknownFormat();
in.position() -= num_bytes;
cursor -= num_bytes;
}
@ -294,7 +298,7 @@ UInt64 ProtobufReader::SimpleReader::continueReadingVarint(UInt64 first_byte)
PROTOBUF_READER_READ_VARINT_BYTE(10)
#undef PROTOBUF_READER_READ_VARINT_BYTE
unknownFormat();
throwUnknownFormat();
__builtin_unreachable();
}
@ -327,7 +331,7 @@ void ProtobufReader::SimpleReader::ignoreVarint()
PROTOBUF_READER_IGNORE_VARINT_BYTE(10)
#undef PROTOBUF_READER_IGNORE_VARINT_BYTE
unknownFormat();
throwUnknownFormat();
}
void ProtobufReader::SimpleReader::ignoreGroup()
@ -371,11 +375,10 @@ void ProtobufReader::SimpleReader::ignoreGroup()
break;
}
}
unknownFormat();
throwUnknownFormat();
}
}
// Implementation for a converter from any protobuf field type to any DB data type.
class ProtobufReader::ConverterBaseImpl : public ProtobufReader::IConverter
{

View File

@ -97,10 +97,19 @@ private:
bool readUInt(UInt64 & value);
template<typename T> bool readFixed(T & value);
bool readStringInto(PaddedPODArray<UInt8> & str);
bool ALWAYS_INLINE maybeCanReadValue() const { return field_end != REACHED_END; }
bool ALWAYS_INLINE maybeCanReadValue() const
{
if (field_end == REACHED_END)
return false;
if (cursor < root_message_end)
return true;
throwUnknownFormat();
}
private:
void readBinary(void* data, size_t size);
void readBinary(void * data, size_t size);
void ignore(UInt64 num_bytes);
void moveCursorBackward(UInt64 num_bytes);
@ -119,6 +128,8 @@ private:
void ignoreVarint();
void ignoreGroup();
[[noreturn]] static void throwUnknownFormat();
static constexpr UInt64 REACHED_END = 0;
ReadBuffer & in;
@ -126,6 +137,8 @@ private:
std::vector<UInt64> parent_message_ends;
UInt64 current_message_end;
UInt64 field_end;
UInt64 root_message_end;
};
class IConverter

View File

@ -553,14 +553,13 @@ class FunctionBinaryArithmetic : public IFunction
AggregateFunctionPtr function = column.getAggregateFunction();
auto arena = std::make_shared<Arena>();
size_t size = agg_state_is_const ? 1 : input_rows_count;
auto column_to = ColumnAggregateFunction::create(function, Arenas(1, arena));
auto column_to = ColumnAggregateFunction::create(function);
column_to->reserve(size);
auto column_from = ColumnAggregateFunction::create(function, Arenas(1, arena));
auto column_from = ColumnAggregateFunction::create(function);
column_from->reserve(size);
for (size_t i = 0; i < size; ++i)
@ -574,6 +573,12 @@ class FunctionBinaryArithmetic : public IFunction
UInt64 m = typeid_cast<const ColumnConst *>(block.getByPosition(new_arguments[1]).column.get())->getValue<UInt64>();
// Since we merge the function states by ourselves, we have to have an
// Arena for this. Pass it to the resulting column so that the arena
// has a proper lifetime.
auto arena = std::make_shared<Arena>();
column_to->addArena(arena);
/// We use exponentiation by squaring algorithm to perform multiplying aggregate states by N in O(log(N)) operations
/// https://en.wikipedia.org/wiki/Exponentiation_by_squaring
while (m)

View File

@ -1548,13 +1548,15 @@ public:
ColumnString::Chars & vec_res_upper_range = col_res_upper_range->getChars();
vec_res_upper_range.resize(input_rows_count * IPV6_BINARY_LENGTH);
static constexpr UInt8 max_cidr_mask = IPV6_BINARY_LENGTH * 8;
for (size_t offset = 0; offset < input_rows_count; ++offset)
{
const size_t offset_ipv6 = offset * IPV6_BINARY_LENGTH;
UInt8 cidr = col_const_cidr_in
? col_const_cidr_in->getValue<UInt8>()
: col_cidr_in->getData()[offset];
cidr = std::min(cidr, max_cidr_mask);
applyCIDRMask(&vec_in[offset_ipv6], &vec_res_lower_range[offset_ipv6], &vec_res_upper_range[offset_ipv6], cidr);
}

View File

@ -0,0 +1,12 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsIntrospection.h>
namespace DB
{
void registerFunctionsIntrospection(FunctionFactory & factory)
{
factory.registerFunction<FunctionSymbolizeTrace>();
}
}

View File

@ -0,0 +1,107 @@
#pragma once
#include <common/StackTrace.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnArray.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int SIZES_OF_ARRAYS_DOESNT_MATCH;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
class FunctionSymbolizeTrace : public IFunction
{
public:
static constexpr auto name = "symbolizeTrace";
static FunctionPtr create(const Context &)
{
return std::make_shared<FunctionSymbolizeTrace>();
}
String getName() const override
{
return name;
}
size_t getNumberOfArguments() const override
{
return 1;
}
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() != 1)
throw Exception("Function " + getName() + " needs exactly one argument; passed "
+ toString(arguments.size()) + ".",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const auto array_type = checkAndGetDataType<DataTypeArray>(arguments[0].type.get());
if (!array_type)
throw Exception("The only argument for function " + getName() + " must be array. Found "
+ arguments[0].type->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
DataTypePtr nested_type = array_type->getNestedType();
if (!WhichDataType(nested_type).isUInt64())
throw Exception("The only argument for function " + getName() + " must be array of UInt64. Found "
+ arguments[0].type->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeString>();
}
bool useDefaultImplementationForConstants() const override
{
return true;
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override
{
const ColumnPtr column = block.getByPosition(arguments[0]).column;
const ColumnArray * column_array = checkAndGetColumn<ColumnArray>(column.get());
if (!column_array)
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
const ColumnPtr data_ptr = column_array->getDataPtr();
const ColumnVector<UInt64> * data_vector = checkAndGetColumn<ColumnVector<UInt64>>(&*data_ptr);
const typename ColumnVector<UInt64>::Container & data = data_vector->getData();
const ColumnArray::Offsets & offsets = column_array->getOffsets();
auto result_column = ColumnString::create();
StackTrace::Frames frames;
size_t current_offset = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
size_t current_size = 0;
for (; current_size < frames.size() && current_offset + current_size < offsets[i]; ++current_size)
{
frames[current_size] = reinterpret_cast<void *>(data[current_offset + current_size]);
}
std::string backtrace = StackTrace(frames.begin(), frames.begin() + current_size).toString();
result_column->insertDataWithTerminatingZero(backtrace.c_str(), backtrace.length() + 1);
current_offset = offsets[i];
}
block.getByPosition(result).column = std::move(result_column);
}
};
}

View File

@ -36,6 +36,7 @@ void registerFunctionsURL(FunctionFactory &);
void registerFunctionsVisitParam(FunctionFactory &);
void registerFunctionsMath(FunctionFactory &);
void registerFunctionsGeo(FunctionFactory &);
void registerFunctionsIntrospection(FunctionFactory &);
void registerFunctionsNull(FunctionFactory &);
void registerFunctionsFindCluster(FunctionFactory &);
void registerFunctionsJSON(FunctionFactory &);
@ -74,6 +75,7 @@ void registerFunctions()
registerFunctionsVisitParam(factory);
registerFunctionsMath(factory);
registerFunctionsGeo(factory);
registerFunctionsIntrospection(factory);
registerFunctionsNull(factory);
registerFunctionsFindCluster(factory);
registerFunctionsJSON(factory);

View File

@ -4,6 +4,7 @@
#include <Columns/ColumnConst.h>
#include <DataTypes/DataTypesNumber.h>
#include <Common/FieldVisitors.h>
#include <common/sleep.h>
#include <IO/WriteHelpers.h>
@ -86,8 +87,8 @@ public:
if (seconds > 3.0) /// The choice is arbitrary
throw Exception("The maximum sleep time is 3 seconds. Requested: " + toString(seconds), ErrorCodes::TOO_SLOW);
UInt64 useconds = seconds * (variant == FunctionSleepVariant::PerBlock ? 1 : size) * 1e6;
::usleep(useconds);
UInt64 microseconds = seconds * (variant == FunctionSleepVariant::PerBlock ? 1 : size) * 1e6;
sleepForMicroseconds(microseconds);
}
/// convertToFullColumn needed, because otherwise (constant expression case) function will not get called on each block.

View File

@ -16,7 +16,7 @@ HDFSBuilderPtr createHDFSBuilder(const Poco::URI & uri)
auto & host = uri.getHost();
auto port = uri.getPort();
auto & path = uri.getPath();
if (host.empty() || port == 0 || path.empty())
if (host.empty() || path.empty())
throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
HDFSBuilderPtr builder(hdfsNewBuilder());

View File

@ -4,6 +4,7 @@
#include <Parsers/ASTAsterisk.h>
#include <Parsers/ASTQualifiedAsterisk.h>
#include <Parsers/ASTColumnsMatcher.h>
namespace DB
{
@ -24,9 +25,11 @@ struct AsteriskSemantic
static void setAliases(ASTAsterisk & node, const RevertedAliasesPtr & aliases) { node.semantic = makeSemantic(aliases); }
static void setAliases(ASTQualifiedAsterisk & node, const RevertedAliasesPtr & aliases) { node.semantic = makeSemantic(aliases); }
static void setAliases(ASTColumnsMatcher & node, const RevertedAliasesPtr & aliases) { node.semantic = makeSemantic(aliases); }
static RevertedAliasesPtr getAliases(const ASTAsterisk & node) { return node.semantic ? node.semantic->aliases : nullptr; }
static RevertedAliasesPtr getAliases(const ASTQualifiedAsterisk & node) { return node.semantic ? node.semantic->aliases : nullptr; }
static RevertedAliasesPtr getAliases(const ASTColumnsMatcher & node) { return node.semantic ? node.semantic->aliases : nullptr; }
private:
static std::shared_ptr<AsteriskSemanticImpl> makeSemantic(const RevertedAliasesPtr & aliases)

View File

@ -159,10 +159,14 @@ void AsynchronousMetrics::update()
size_t max_part_count_for_partition = 0;
size_t number_of_databases = databases.size();
size_t total_number_of_tables = 0;
for (const auto & db : databases)
{
for (auto iterator = db.second->getIterator(context); iterator->isValid(); iterator->next())
{
++total_number_of_tables;
auto & table = iterator->table();
StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(table.get());
StorageReplicatedMergeTree * table_replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(table.get());
@ -213,6 +217,9 @@ void AsynchronousMetrics::update()
set("ReplicasMaxRelativeDelay", max_relative_delay);
set("MaxPartCountForPartition", max_part_count_for_partition);
set("NumberOfDatabases", number_of_databases);
set("NumberOfTables", total_number_of_tables);
}
#if USE_TCMALLOC

View File

@ -504,20 +504,6 @@ std::shared_ptr<CatBoostLibHolder> getCatBoostWrapperHolder(const std::string &
CatBoostModel::CatBoostModel(std::string name_, std::string model_path_, std::string lib_path_,
const ExternalLoadableLifetime & lifetime)
: name(std::move(name_)), model_path(std::move(model_path_)), lib_path(std::move(lib_path_)), lifetime(lifetime)
{
try
{
init();
}
catch (...)
{
creation_exception = std::current_exception();
}
creation_time = std::chrono::system_clock::now();
}
void CatBoostModel::init()
{
api_provider = getCatBoostWrapperHolder(lib_path);
api = &api_provider->getAPI();

View File

@ -68,9 +68,6 @@ public:
std::shared_ptr<const IExternalLoadable> clone() const override;
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
std::exception_ptr getCreationException() const override { return creation_exception; }
private:
std::string name;
std::string model_path;
@ -85,9 +82,6 @@ private:
size_t cat_features_count;
size_t tree_count;
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
void init();
};

View File

@ -99,7 +99,8 @@ void SelectStreamFactory::createForShard(
if (table_func_ptr)
{
const auto * table_function = table_func_ptr->as<ASTFunction>();
main_table_storage = TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context);
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
main_table_storage = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());
}
else
main_table_storage = context.tryGetTable(main_table.database, main_table.table);

View File

@ -41,6 +41,7 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
#include <Common/DNSResolver.h>
@ -53,6 +54,7 @@
#include <Common/Config/ConfigProcessor.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ShellCommand.h>
#include <Common/TraceCollector.h>
#include <common/logger_useful.h>
@ -153,6 +155,8 @@ struct ContextShared
ActionLocksManagerPtr action_locks_manager; /// Set of storages' action lockers
std::optional<SystemLogs> system_logs; /// Used to log queries and operations on parts
std::unique_ptr<TraceCollector> trace_collector; /// Thread collecting traces from threads executing queries
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
class SessionKeyHash
@ -285,6 +289,22 @@ struct ContextShared
background_pool.reset();
schedule_pool.reset();
ddl_worker.reset();
/// Stop trace collector if any
trace_collector.reset();
}
bool hasTraceCollector()
{
return trace_collector != nullptr;
}
void initializeTraceCollector(std::shared_ptr<TraceLog> trace_log)
{
if (trace_log == nullptr)
return;
trace_collector = std::make_unique<TraceCollector>(trace_log);
}
private:
@ -497,7 +517,6 @@ DatabasePtr Context::tryGetDatabase(const String & database_name)
return it->second;
}
String Context::getPath() const
{
auto lock = getLock();
@ -963,7 +982,7 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression)
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression->as<ASTFunction>()->name, *this);
/// Run it and remember the result
res = table_function_ptr->execute(table_expression, *this);
res = table_function_ptr->execute(table_expression, *this, table_function_ptr->getName());
}
return res;
@ -1623,6 +1642,16 @@ void Context::initializeSystemLogs()
shared->system_logs.emplace(*global_context, getConfigRef());
}
bool Context::hasTraceCollector()
{
return shared->hasTraceCollector();
}
void Context::initializeTraceCollector()
{
shared->initializeTraceCollector(getTraceLog());
}
std::shared_ptr<QueryLog> Context::getQueryLog()
{
@ -1663,6 +1692,7 @@ std::shared_ptr<PartLog> Context::getPartLog(const String & part_database)
return shared->system_logs->part_log;
}
std::shared_ptr<TextLog> Context::getTextLog()
{
auto lock = getLock();
@ -1672,6 +1702,17 @@ std::shared_ptr<TextLog> Context::getTextLog()
return shared->system_logs->text_log;
}
std::shared_ptr<TraceLog> Context::getTraceLog()
{
auto lock = getLock();
if (!shared->system_logs || !shared->system_logs->trace_log)
return nullptr;
return shared->system_logs->trace_log;
}
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const

View File

@ -63,6 +63,7 @@ class QueryLog;
class QueryThreadLog;
class PartLog;
class TextLog;
class TraceLog;
struct MergeTreeSettings;
class IDatabase;
class DDLGuard;
@ -421,11 +422,16 @@ public:
/// Call after initialization before using system logs. Call for global context.
void initializeSystemLogs();
void initializeTraceCollector();
bool hasTraceCollector();
/// Nullptr if the query log is not ready for this moment.
std::shared_ptr<QueryLog> getQueryLog();
std::shared_ptr<QueryThreadLog> getQueryThreadLog();
std::shared_ptr<TextLog> getTextLog();
std::shared_ptr<TraceLog> getTraceLog();
/// Returns an object used to log opertaions with parts if it possible.
/// Provide table name to make required cheks.
std::shared_ptr<PartLog> getPartLog(const String & part_database);

View File

@ -22,6 +22,7 @@
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/randomSeed.h>
#include <common/sleep.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
@ -953,7 +954,7 @@ void DDLWorker::runMainThread()
tryLogCurrentException(__PRETTY_FUNCTION__);
/// Avoid busy loop when ZooKeeper is not available.
::sleep(1);
sleepForSeconds(1);
}
}
catch (...)

View File

@ -6,7 +6,6 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTAsterisk.h>
#include <Parsers/ASTQualifiedAsterisk.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTSelectQuery.h>

View File

@ -219,7 +219,7 @@ class ExternalLoader::LoadingDispatcher : private boost::noncopyable
{
public:
/// Called to load or reload an object.
using CreateObjectFunction = std::function<ObjectWithException(
using CreateObjectFunction = std::function<LoadablePtr(
const String & /* name */, const ObjectConfig & /* config */, bool config_changed, const LoadablePtr & /* previous_version */)>;
/// Called after loading/reloading an object to calculate the time of the next update.
@ -511,12 +511,14 @@ public:
{
std::lock_guard lock{mutex};
for (auto & [name, info] : infos)
{
if ((info.was_loading() || load_never_loading) && filter_by_name(name))
{
cancelLoading(info);
info.forced_to_reload = true;
startLoading(name, info);
}
}
}
/// Starts reloading of all the objects.
@ -528,20 +530,22 @@ public:
/// The function doesn't touch the objects which were never tried to load.
void reloadOutdated()
{
/// Iterate through all the objects and find loaded ones which should be checked if they were modified.
std::unordered_map<LoadablePtr, bool> is_modified_map;
{
std::lock_guard lock{mutex};
TimePoint now = std::chrono::system_clock::now();
for (const auto & name_and_info : infos)
{
const auto & info = name_and_info.second;
if ((now >= info.next_update_time) && !info.loading() && info.was_loading())
if ((now >= info.next_update_time) && !info.loading() && info.loaded())
is_modified_map.emplace(info.object, true);
}
}
/// The `mutex` should be unlocked while we're calling the function is_object_modified().
/// Find out which of the loaded objects were modified.
/// We couldn't perform these checks while we were building `is_modified_map` because
/// the `mutex` should be unlocked while we're calling the function is_object_modified().
for (auto & [object, is_modified_flag] : is_modified_map)
{
try
@ -554,21 +558,38 @@ public:
}
}
/// Iterate through all the objects again and either start loading or just set `next_update_time`.
{
std::lock_guard lock{mutex};
TimePoint now = std::chrono::system_clock::now();
for (auto & [name, info] : infos)
if ((now >= info.next_update_time) && !info.loading() && info.was_loading())
{
if ((now >= info.next_update_time) && !info.loading())
{
auto it = is_modified_map.find(info.object);
if (it == is_modified_map.end())
continue; /// Object has been just added, it can be simply omitted from this update of outdated.
bool is_modified_flag = it->second;
if (info.loaded() && !is_modified_flag)
info.next_update_time = calculate_next_update_time(info.object, info.error_count);
else
if (info.loaded())
{
auto it = is_modified_map.find(info.object);
if (it == is_modified_map.end())
continue; /// Object has been just loaded (it wasn't loaded while we were building the map `is_modified_map`), so we don't have to reload it right now.
bool is_modified_flag = it->second;
if (!is_modified_flag)
{
/// Object wasn't modified so we only have to set `next_update_time`.
info.next_update_time = calculate_next_update_time(info.object, info.error_count);
continue;
}
/// Object was modified and should be reloaded.
startLoading(name, info);
}
else if (info.failed())
{
/// Object was never loaded successfully and should be reloaded.
startLoading(name, info);
}
}
}
}
}
@ -783,7 +804,7 @@ private:
std::exception_ptr new_exception;
try
{
std::tie(new_object, new_exception) = create_object(name, config, config_changed, previous_version);
new_object = create_object(name, config, config_changed, previous_version);
}
catch (...)
{
@ -792,8 +813,6 @@ private:
if (!new_object && !new_exception)
throw Exception("No object created and no exception raised for " + type_name, ErrorCodes::LOGICAL_ERROR);
if (new_object && new_exception)
new_object = nullptr;
/// Calculate a new update time.
TimePoint next_update_time;
@ -1152,17 +1171,13 @@ void ExternalLoader::reload(bool load_never_loading)
loading_dispatcher->reload(load_never_loading);
}
ExternalLoader::ObjectWithException ExternalLoader::createObject(
ExternalLoader::LoadablePtr ExternalLoader::createObject(
const String & name, const ObjectConfig & config, bool config_changed, const LoadablePtr & previous_version) const
{
if (previous_version && !config_changed)
{
auto new_object = previous_version->clone();
return {new_object, new_object->getCreationException()};
}
return previous_version->clone();
auto new_object = create(name, *config.config, config.key_in_config);
return {new_object, new_object->getCreationException()};
return create(name, *config.config, config.key_in_config);
}
ExternalLoader::TimePoint ExternalLoader::calculateNextUpdateTime(const LoadablePtr & loaded_object, size_t error_count) const

View File

@ -186,10 +186,8 @@ protected:
private:
struct ObjectConfig;
using ObjectWithException = std::pair<LoadablePtr, std::exception_ptr>;
ObjectWithException
createObject(const String & name, const ObjectConfig & config, bool config_changed, const LoadablePtr & previous_version) const;
LoadablePtr createObject(const String & name, const ObjectConfig & config, bool config_changed, const LoadablePtr & previous_version) const;
TimePoint calculateNextUpdateTime(const LoadablePtr & loaded_object, size_t error_count) const;
class ConfigFilesReader;

View File

@ -1,6 +1,5 @@
#pragma once
#include <chrono>
#include <string>
#include <memory>
#include <boost/noncopyable.hpp>
@ -41,10 +40,6 @@ public:
virtual bool isModified() const = 0;
/// Returns new object with the same configuration. Is used to update modified object when lifetime exceeded.
virtual std::shared_ptr<const IExternalLoadable> clone() const = 0;
virtual std::chrono::time_point<std::chrono::system_clock> getCreationTime() const = 0;
virtual std::exception_ptr getCreationException() const = 0;
};
}

View File

@ -46,6 +46,8 @@
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/addTypeConversionToAST.h>
#include <TableFunctions/TableFunctionFactory.h>
namespace DB
{
@ -159,7 +161,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
if (need_write_metadata)
Poco::File(metadata_file_tmp_path).renameTo(metadata_file_path);
database->loadTables(context, thread_pool, has_force_restore_data_flag);
database->loadTables(context, has_force_restore_data_flag);
}
catch (...)
{
@ -518,34 +520,45 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
StoragePtr as_storage;
TableStructureReadLockHolder as_storage_lock;
if (!as_table_name.empty())
{
as_storage = context.getTable(as_database_name, as_table_name);
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
}
/// Set and retrieve list of columns.
ColumnsDescription columns = setColumns(create, as_select_sample, as_storage);
ColumnsDescription columns;
StoragePtr res;
/// Check low cardinality types in creating table if it was not allowed in setting
if (!create.attach && !context.getSettingsRef().allow_suspicious_low_cardinality_types)
if (create.as_table_function)
{
for (const auto & name_and_type_pair : columns.getAllPhysical())
const auto & table_function = create.as_table_function->as<ASTFunction &>();
const auto & factory = TableFunctionFactory::instance();
res = factory.get(table_function.name, context)->execute(create.as_table_function, context, create.table);
}
else
{
/// Set and retrieve list of columns.
columns = setColumns(create, as_select_sample, as_storage);
/// Check low cardinality types in creating table if it was not allowed in setting
if (!create.attach && !context.getSettingsRef().allow_suspicious_low_cardinality_types)
{
if (const auto * current_type_ptr = typeid_cast<const DataTypeLowCardinality *>(name_and_type_pair.type.get()))
for (const auto & name_and_type_pair : columns.getAllPhysical())
{
if (!isStringOrFixedString(*removeNullable(current_type_ptr->getDictionaryType())))
throw Exception("Creating columns of type " + current_type_ptr->getName() + " is prohibited by default due to expected negative impact on performance. It can be enabled with the \"allow_suspicious_low_cardinality_types\" setting.",
ErrorCodes::SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY);
if (const auto * current_type_ptr = typeid_cast<const DataTypeLowCardinality *>(name_and_type_pair.type.get()))
{
if (!isStringOrFixedString(*removeNullable(current_type_ptr->getDictionaryType())))
throw Exception("Creating columns of type " + current_type_ptr->getName() + " is prohibited by default due to expected negative impact on performance. It can be enabled with the \"allow_suspicious_low_cardinality_types\" setting.",
ErrorCodes::SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY);
}
}
}
/// Set the table engine if it was not specified explicitly.
setEngine(create);
}
/// Set the table engine if it was not specified explicitly.
setEngine(create);
StoragePtr res;
{
std::unique_ptr<DDLGuard> guard;
@ -585,15 +598,18 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
else if (context.tryGetExternalTable(table_name) && create.if_not_exists)
return {};
res = StorageFactory::instance().get(create,
data_path,
table_name,
database_name,
context,
context.getGlobalContext(),
columns,
create.attach,
false);
if (!create.as_table_function)
{
res = StorageFactory::instance().get(create,
data_path,
table_name,
database_name,
context,
context.getGlobalContext(),
columns,
create.attach,
false);
}
if (create.temporary)
context.getSessionContext().addExternalTable(table_name, res, query_ptr);

View File

@ -31,11 +31,6 @@ public:
static ASTPtr formatIndices(const IndicesDescription & indices);
void setDatabaseLoadingThreadpool(ThreadPool & thread_pool_)
{
thread_pool = &thread_pool_;
}
void setForceRestoreData(bool has_force_restore_data_flag_)
{
has_force_restore_data_flag = has_force_restore_data_flag_;
@ -61,9 +56,6 @@ private:
ASTPtr query_ptr;
Context & context;
/// Using while loading database.
ThreadPool * thread_pool = nullptr;
/// Skip safety threshold when loading tables.
bool has_force_restore_data_flag = false;
/// Is this an internal query - not from the user.

View File

@ -79,7 +79,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
const auto & table_function = table_expression.table_function->as<ASTFunction &>();
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function.name, context);
/// Run the table function and remember the result
table = table_function_ptr->execute(table_expression.table_function, context);
table = table_function_ptr->execute(table_expression.table_function, context, table_function_ptr->getName());
}
else
{
@ -101,6 +101,9 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
for (const auto & column : columns)
{
if (column.is_virtual)
continue;
res_columns[0]->insert(column.name);
res_columns[1]->insert(column.type->getName());

View File

@ -48,7 +48,8 @@ StoragePtr InterpreterInsertQuery::getTable(const ASTInsertQuery & query)
{
const auto * table_function = query.table_function->as<ASTFunction>();
const auto & factory = TableFunctionFactory::instance();
return factory.get(table_function->name, context)->execute(query.table_function, context);
TableFunctionPtr table_function_ptr = factory.get(table_function->name, context);
return table_function_ptr->execute(query.table_function, context, table_function_ptr->getName());
}
/// Into what table to write.

View File

@ -14,6 +14,7 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/TraceLog.h>
#include <Databases/IDatabase.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageReplicatedMergeTree.h>
@ -221,7 +222,8 @@ BlockIO InterpreterSystemQuery::execute()
executeCommandsAndThrowIfError(
[&] () { if (auto query_log = context.getQueryLog()) query_log->flush(); },
[&] () { if (auto part_log = context.getPartLog("")) part_log->flush(); },
[&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(); }
[&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(); },
[&] () { if (auto trace_log = context.getTraceLog()) trace_log->flush(); }
);
break;
case Type::STOP_LISTEN_QUERIES:

Some files were not shown because too many files have changed in this diff Show More