Merge branch 'master' into normalize-bigint

This commit is contained in:
Alexey Milovidov 2021-05-11 02:05:40 +03:00
commit b2ca5cd98b
73 changed files with 2300 additions and 1076 deletions

View File

@ -2,7 +2,7 @@ if (APPLE OR SPLIT_SHARED_LIBRARIES OR NOT ARCH_AMD64)
set (ENABLE_EMBEDDED_COMPILER OFF CACHE INTERNAL "")
endif()
option (ENABLE_EMBEDDED_COMPILER "Set to TRUE to enable support for 'compile_expressions' option for query execution" ${ENABLE_LIBRARIES})
option (ENABLE_EMBEDDED_COMPILER "Enable support for 'compile_expressions' option for query execution" ON)
# Broken in macos. TODO: update clang, re-test, enable on Apple
if (ENABLE_EMBEDDED_COMPILER AND NOT SPLIT_SHARED_LIBRARIES AND ARCH_AMD64 AND NOT (SANITIZE STREQUAL "undefined"))
option (USE_INTERNAL_LLVM_LIBRARY "Use bundled or system LLVM library." ${NOT_UNBUNDLED})

View File

@ -370,6 +370,10 @@ function run_tests
# Depends on AWS
01801_s3_cluster
# Depends on LLVM JIT
01852_jit_if
01865_jit_comparison_constant_result
)
(time clickhouse-test --hung-check -j 8 --order=random --use-skip-list --no-long --testname --shard --zookeeper --skip "${TESTS_TO_SKIP[@]}" -- "$FASTTEST_FOCUS" 2>&1 ||:) | ts '%Y-%m-%d %H:%M:%S' | tee "$FASTTEST_OUTPUT/test_log.txt"

View File

@ -14,11 +14,6 @@
<max_memory_usage>
<max>10G</max>
</max_memory_usage>
<!-- Not ready for production -->
<compile_expressions>
<readonly />
</compile_expressions>
</constraints>
</default>
</profiles>

View File

@ -139,6 +139,7 @@ The following settings can be specified in configuration file for given endpoint
- `endpoint` — Specifies prefix of an endpoint. Mandatory.
- `access_key_id` and `secret_access_key` — Specifies credentials to use with given endpoint. Optional.
- `region` — Specifies S3 region name. Optional.
- `use_environment_credentials` — If set to `true`, S3 client will try to obtain credentials from environment variables and Amazon EC2 metadata for given endpoint. Optional, default value is `false`.
- `use_insecure_imds_request` — If set to `true`, S3 client will use insecure IMDS request while obtaining credentials from Amazon EC2 metadata. Optional, default value is `false`.
- `header` — Adds specified HTTP header to a request to given endpoint. Optional, can be speficied multiple times.
@ -152,6 +153,7 @@ The following settings can be specified in configuration file for given endpoint
<endpoint>https://storage.yandexcloud.net/my-test-bucket-768/</endpoint>
<!-- <access_key_id>ACCESS_KEY_ID</access_key_id> -->
<!-- <secret_access_key>SECRET_ACCESS_KEY</secret_access_key> -->
<!-- <region>us-west-1</region> -->
<!-- <use_environment_credentials>false</use_environment_credentials> -->
<!-- <use_insecure_imds_request>false</use_insecure_imds_request> -->
<!-- <header>Authorization: Bearer SOME-TOKEN</header> -->

View File

@ -739,6 +739,7 @@ Configuration markup:
<endpoint>https://storage.yandexcloud.net/my-bucket/root-path/</endpoint>
<access_key_id>your_access_key_id</access_key_id>
<secret_access_key>your_secret_access_key</secret_access_key>
<region></region>
<server_side_encryption_customer_key_base64>your_base64_encoded_customer_key</server_side_encryption_customer_key_base64>
<proxy>
<uri>http://proxy1</uri>
@ -764,6 +765,7 @@ Required parameters:
- `secret_access_key` — S3 secret access key.
Optional parameters:
- `region` — S3 region name.
- `use_environment_credentials` — Reads AWS credentials from the Environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN if they exist. Default value is `false`.
- `use_insecure_imds_request` — If set to `true`, S3 client will use insecure IMDS request while obtaining credentials from Amazon EC2 metadata. Default value is `false`.
- `proxy` — Proxy configuration for S3 endpoint. Each `uri` element inside `proxy` block should contain a proxy URL.

View File

@ -12,6 +12,9 @@ The result depends on the order of running the query, and is nondeterministic.
When using multiple `quantile*` functions with different levels in a query, the internal states are not combined (that is, the query works less efficiently than it could). In this case, use the [quantiles](../../../sql-reference/aggregate-functions/reference/quantiles.md#quantiles) function.
!!! note "Note"
Using `quantileTDigestWeighted` [is not recommended for tiny data sets](https://github.com/tdunning/t-digest/issues/167#issuecomment-828650275) and can lead to significat error. In this case, consider possibility of using [`quantileTDigest`](../../../sql-reference/aggregate-functions/reference/quantiletdigest.md) instead.
**Syntax**
``` sql

View File

@ -82,6 +82,7 @@ SELECT * FROM s3_engine_table LIMIT 2;
Необязательные настройки:
- `access_key_id` и `secret_access_key` — указывают учетные данные для использования с данной точкой приема запроса.
- `region` — название региона S3.
- `use_environment_credentials` — если `true`, S3-клиент будет пытаться получить учетные данные из переменных среды и метаданных Amazon EC2 для данной точки приема запроса. Значение по умолчанию - `false`.
- `header` — добавляет указанный HTTP-заголовок к запросу на заданную точку приема запроса. Может быть определен несколько раз.
- `server_side_encryption_customer_key_base64` — устанавливает необходимые заголовки для доступа к объектам S3 с шифрованием SSE-C.
@ -94,6 +95,7 @@ SELECT * FROM s3_engine_table LIMIT 2;
<endpoint>https://storage.yandexcloud.net/my-test-bucket-768/</endpoint>
<!-- <access_key_id>ACCESS_KEY_ID</access_key_id> -->
<!-- <secret_access_key>SECRET_ACCESS_KEY</secret_access_key> -->
<!-- <region>us-west-1</region> -->
<!-- <use_environment_credentials>false</use_environment_credentials> -->
<!-- <header>Authorization: Bearer SOME-TOKEN</header> -->
<!-- <server_side_encryption_customer_key_base64>BASE64-ENCODED-KEY</server_side_encryption_customer_key_base64> -->

View File

@ -727,6 +727,7 @@ SETTINGS storage_policy = 'moving_from_ssd_to_hdd'
<endpoint>https://storage.yandexcloud.net/my-bucket/root-path/</endpoint>
<access_key_id>your_access_key_id</access_key_id>
<secret_access_key>your_secret_access_key</secret_access_key>
<region></region>
<proxy>
<uri>http://proxy1</uri>
<uri>http://proxy2</uri>
@ -753,6 +754,7 @@ SETTINGS storage_policy = 'moving_from_ssd_to_hdd'
Необязательные параметры:
- `region` — название региона S3.
- `use_environment_credentials` — признак, нужно ли считывать учетные данные AWS из сетевого окружения, а также из переменных окружения `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` и `AWS_SESSION_TOKEN`, если они есть. Значение по умолчанию: `false`.
- `use_insecure_imds_request` — признак, нужно ли использовать менее безопасное соединение при выполнении запроса к IMDS при получении учётных данных из метаданных Amazon EC2. Значение по умолчанию: `false`.
- `proxy` — конфигурация прокси-сервера для конечной точки S3. Каждый элемент `uri` внутри блока `proxy` должен содержать URL прокси-сервера.

View File

@ -12,6 +12,9 @@ toc_priority: 208
Внутренние состояния функций `quantile*` не объединяются, если они используются в одном запросе. Если вам необходимо вычислить квантили нескольких уровней, используйте функцию [quantiles](#quantiles), это повысит эффективность запроса.
!!! note "Примечание"
Использование `quantileTDigestWeighted` [не рекомендуется для небольших наборов данных](https://github.com/tdunning/t-digest/issues/167#issuecomment-828650275) и может привести к значительной ошибке. Рассмотрите возможность использования [`quantileTDigest`](../../../sql-reference/aggregate-functions/reference/quantiletdigest.md) в таких случаях.
**Синтаксис**
``` sql

View File

@ -13,6 +13,7 @@
#include <Poco/Net/HTTPServer.h>
#include <Poco/Net/NetException.h>
#include <Poco/Util/HelpFormatter.h>
#include <Poco/Environment.h>
#include <ext/scope_guard.h>
#include <common/defines.h>
#include <common/logger_useful.h>
@ -385,6 +386,11 @@ void Server::initialize(Poco::Util::Application & self)
{
BaseDaemon::initialize(self);
logger().information("starting up");
LOG_INFO(&logger(), "OS Name = {}, OS Version = {}, OS Architecture = {}",
Poco::Environment::osName(),
Poco::Environment::osVersion(),
Poco::Environment::osArchitecture());
}
std::string Server::getDefaultCorePath() const
@ -879,7 +885,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setMMappedFileCache(mmap_cache_size);
#if USE_EMBEDDED_COMPILER
size_t compiled_expression_cache_size = config().getUInt64("compiled_expression_cache_size", 500);
constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 1024;
size_t compiled_expression_cache_size = config().getUInt64("compiled_expression_cache_size", compiled_expression_cache_size_default);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size);
#endif

View File

@ -329,6 +329,8 @@
-->
<mmap_cache_size>1000</mmap_cache_size>
<!-- Cache size for compiled expressions.-->
<compiled_expression_cache_size>1073741824</compiled_expression_cache_size>
<!-- Path to data directory, with trailing slash. -->
<path>/var/lib/clickhouse/</path>

View File

@ -184,6 +184,7 @@ add_object_library(clickhouse_disks Disks)
add_object_library(clickhouse_interpreters Interpreters)
add_object_library(clickhouse_interpreters_mysql Interpreters/MySQL)
add_object_library(clickhouse_interpreters_clusterproxy Interpreters/ClusterProxy)
add_object_library(clickhouse_interpreters_jit Interpreters/JIT)
add_object_library(clickhouse_columns Columns)
add_object_library(clickhouse_storages Storages)
add_object_library(clickhouse_storages_distributed Storages/Distributed)

View File

@ -145,6 +145,11 @@ public:
return cells.size();
}
size_t maxSize() const
{
return max_size;
}
void reset()
{
std::lock_guard lock(mutex);

View File

@ -24,7 +24,10 @@ namespace
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
///
/// And it cannot be large, since otherwise it will not fit into PIPE_BUF.
constexpr size_t QUERY_ID_MAX_LEN = sizeof("00000000-0000-0000-0000-000000000000") - 1; // 36
/// The performance test query ids can be surprisingly long like
/// `aggregating_merge_tree_simple_aggregate_function_string.query100.profile100`,
/// so make some allowance for them as well.
constexpr size_t QUERY_ID_MAX_LEN = 128;
}
LazyPipeFDs pipe;

View File

@ -101,7 +101,7 @@ class IColumn;
M(Float, totals_auto_threshold, 0.5, "The threshold for totals_mode = 'auto'.", 0) \
\
M(Bool, allow_suspicious_low_cardinality_types, false, "In CREATE TABLE statement allows specifying LowCardinality modifier for types of small fixed size (8 or less). Enabling this may increase merge times and memory consumption.", 0) \
M(Bool, compile_expressions, false, "Compile some scalar functions and operators to native code.", 0) \
M(Bool, compile_expressions, true, "Compile some scalar functions and operators to native code.", 0) \
M(UInt64, min_count_to_compile_expression, 3, "The number of identical expressions before they are JIT-compiled", 0) \
M(UInt64, group_by_two_level_threshold, 100000, "From what number of keys, a two-level aggregation starts. 0 - the threshold is not set.", 0) \
M(UInt64, group_by_two_level_threshold_bytes, 100000000, "From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. Two-level aggregation is used when at least one of the thresholds is triggered.", 0) \

View File

@ -5,15 +5,13 @@
#endif
#if USE_EMBEDDED_COMPILER
# include <DataTypes/DataTypeDate.h>
# include <DataTypes/DataTypeDateTime.h>
# include <DataTypes/DataTypeFixedString.h>
# include <DataTypes/DataTypeInterval.h>
# include <DataTypes/DataTypeNullable.h>
# include <DataTypes/DataTypeUUID.h>
# include <DataTypes/DataTypesNumber.h>
# include <Common/typeid_cast.h>
# include <Common/Exception.h>
# include <DataTypes/IDataType.h>
# include <DataTypes/DataTypeNullable.h>
# include <DataTypes/DataTypeFixedString.h>
# include <Columns/ColumnConst.h>
# include <Columns/ColumnNullable.h>
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wunused-parameter"
@ -29,60 +27,56 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
template <typename... Ts>
static inline bool typeIsEither(const IDataType & type)
{
return (typeid_cast<const Ts *>(&type) || ...);
}
static inline bool typeIsSigned(const IDataType & type)
{
return typeIsEither<
DataTypeInt8, DataTypeInt16, DataTypeInt32, DataTypeInt64,
DataTypeFloat32, DataTypeFloat64, DataTypeInterval
>(type);
WhichDataType data_type(type);
return data_type.isNativeInt() || data_type.isFloat();
}
static inline llvm::Type * toNativeType(llvm::IRBuilderBase & builder, const IDataType & type)
{
if (auto * nullable = typeid_cast<const DataTypeNullable *>(&type))
WhichDataType data_type(type);
if (data_type.isNullable())
{
auto * wrapped = toNativeType(builder, *nullable->getNestedType());
const auto & data_type_nullable = static_cast<const DataTypeNullable&>(type);
auto * wrapped = toNativeType(builder, *data_type_nullable.getNestedType());
return wrapped ? llvm::StructType::get(wrapped, /* is null = */ builder.getInt1Ty()) : nullptr;
}
/// LLVM doesn't have unsigned types, it has unsigned instructions.
if (typeIsEither<DataTypeInt8, DataTypeUInt8>(type))
if (data_type.isInt8() || data_type.isUInt8())
return builder.getInt8Ty();
if (typeIsEither<DataTypeInt16, DataTypeUInt16, DataTypeDate>(type))
else if (data_type.isInt16() || data_type.isUInt16() || data_type.isDate())
return builder.getInt16Ty();
if (typeIsEither<DataTypeInt32, DataTypeUInt32, DataTypeDateTime>(type))
else if (data_type.isInt32() || data_type.isUInt32() || data_type.isDateTime())
return builder.getInt32Ty();
if (typeIsEither<DataTypeInt64, DataTypeUInt64, DataTypeInterval>(type))
else if (data_type.isInt64() || data_type.isUInt64())
return builder.getInt64Ty();
if (typeIsEither<DataTypeUUID>(type))
return builder.getInt128Ty();
if (typeIsEither<DataTypeFloat32>(type))
else if (data_type.isFloat32())
return builder.getFloatTy();
if (typeIsEither<DataTypeFloat64>(type))
else if (data_type.isFloat64())
return builder.getDoubleTy();
if (auto * fixed_string = typeid_cast<const DataTypeFixedString *>(&type))
return llvm::VectorType::get(builder.getInt8Ty(), fixed_string->getN());
else if (data_type.isFixedString())
{
const auto & data_type_fixed_string = static_cast<const DataTypeFixedString &>(type);
return llvm::VectorType::get(builder.getInt8Ty(), data_type_fixed_string.getN());
}
return nullptr;
}
static inline bool canBeNativeType(const IDataType & type)
{
if (auto * nullable = typeid_cast<const DataTypeNullable *>(&type))
return canBeNativeType(*nullable->getNestedType());
WhichDataType data_type(type);
return typeIsEither<DataTypeInt8, DataTypeUInt8>(type)
|| typeIsEither<DataTypeInt16, DataTypeUInt16, DataTypeDate>(type)
|| typeIsEither<DataTypeInt32, DataTypeUInt32, DataTypeDateTime>(type)
|| typeIsEither<DataTypeInt64, DataTypeUInt64, DataTypeInterval>(type)
|| typeIsEither<DataTypeUUID>(type)
|| typeIsEither<DataTypeFloat32>(type)
|| typeIsEither<DataTypeFloat64>(type)
|| typeid_cast<const DataTypeFixedString *>(&type);
if (data_type.isNullable())
{
const auto & data_type_nullable = static_cast<const DataTypeNullable&>(type);
return canBeNativeType(*data_type_nullable.getNestedType());
}
return data_type.isNativeInt() || data_type.isNativeUInt() || data_type.isFloat() || data_type.isFixedString() || data_type.isDate();
}
static inline llvm::Type * toNativeType(llvm::IRBuilderBase & builder, const DataTypePtr & type)
@ -102,45 +96,98 @@ static inline llvm::Value * nativeBoolCast(llvm::IRBuilder<> & b, const DataType
return b.CreateICmpNE(value, zero);
if (value->getType()->isFloatingPointTy())
return b.CreateFCmpONE(value, zero); /// QNaN is false
throw Exception("Cannot cast non-number " + from->getName() + " to bool", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot cast non-number {} to bool", from->getName());
}
static inline llvm::Value * nativeCast(llvm::IRBuilder<> & b, const DataTypePtr & from, llvm::Value * value, llvm::Type * to)
{
auto * n_from = value->getType();
if (n_from == to)
return value;
if (n_from->isIntegerTy() && to->isFloatingPointTy())
else if (n_from->isIntegerTy() && to->isFloatingPointTy())
return typeIsSigned(*from) ? b.CreateSIToFP(value, to) : b.CreateUIToFP(value, to);
if (n_from->isFloatingPointTy() && to->isIntegerTy())
else if (n_from->isFloatingPointTy() && to->isIntegerTy())
return typeIsSigned(*from) ? b.CreateFPToSI(value, to) : b.CreateFPToUI(value, to);
if (n_from->isIntegerTy() && to->isIntegerTy())
else if (n_from->isIntegerTy() && to->isIntegerTy())
return b.CreateIntCast(value, to, typeIsSigned(*from));
if (n_from->isFloatingPointTy() && to->isFloatingPointTy())
else if (n_from->isFloatingPointTy() && to->isFloatingPointTy())
return b.CreateFPCast(value, to);
throw Exception("Cannot cast " + from->getName() + " to requested type", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot cast {} to requested type", from->getName());
}
static inline llvm::Value * nativeCast(llvm::IRBuilder<> & b, const DataTypePtr & from, llvm::Value * value, const DataTypePtr & to)
{
auto * n_to = toNativeType(b, to);
if (value->getType() == n_to)
{
return value;
if (from->isNullable() && to->isNullable())
}
else if (from->isNullable() && to->isNullable())
{
auto * inner = nativeCast(b, removeNullable(from), b.CreateExtractValue(value, {0}), to);
return b.CreateInsertValue(inner, b.CreateExtractValue(value, {1}), {1});
}
if (from->isNullable())
else if (from->isNullable())
{
return nativeCast(b, removeNullable(from), b.CreateExtractValue(value, {0}), to);
if (to->isNullable())
}
else if (to->isNullable())
{
auto * inner = nativeCast(b, from, value, removeNullable(to));
return b.CreateInsertValue(llvm::Constant::getNullValue(n_to), inner, {0});
}
return nativeCast(b, from, value, n_to);
}
static inline llvm::Constant * getColumnNativeValue(llvm::IRBuilderBase & builder, const DataTypePtr & column_type, const IColumn & column, size_t index)
{
if (const auto * constant = typeid_cast<const ColumnConst *>(&column))
{
return getColumnNativeValue(builder, column_type, constant->getDataColumn(), 0);
}
WhichDataType column_data_type(column_type);
auto * type = toNativeType(builder, column_type);
if (!type || column.size() <= index)
return nullptr;
if (column_data_type.isNullable())
{
const auto & nullable_data_type = assert_cast<const DataTypeNullable &>(*column_type);
const auto & nullable_column = assert_cast<const ColumnNullable &>(column);
auto * value = getColumnNativeValue(builder, nullable_data_type.getNestedType(), nullable_column.getNestedColumn(), index);
auto * is_null = llvm::ConstantInt::get(type->getContainedType(1), nullable_column.isNullAt(index));
return value ? llvm::ConstantStruct::get(static_cast<llvm::StructType *>(type), value, is_null) : nullptr;
}
else if (column_data_type.isFloat32())
{
return llvm::ConstantFP::get(type, assert_cast<const ColumnVector<Float32> &>(column).getElement(index));
}
else if (column_data_type.isFloat64())
{
return llvm::ConstantFP::get(type, assert_cast<const ColumnVector<Float64> &>(column).getElement(index));
}
else if (column_data_type.isNativeUInt() || column_data_type.isDateOrDateTime())
{
return llvm::ConstantInt::get(type, column.getUInt(index));
}
else if (column_data_type.isNativeInt())
{
return llvm::ConstantInt::get(type, column.getInt(index));
}
return nullptr;
}
}
#endif

View File

@ -42,7 +42,6 @@ HTTPDictionarySource::HTTPDictionarySource(
, context(context_)
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context))
{
if (check_config)
context->getRemoteHostFilter().checkURL(Poco::URI(url));
@ -87,6 +86,16 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
credentials.setPassword(other.credentials.getPassword());
}
BlockInputStreamPtr HTTPDictionarySource::createWrappedBuffer(std::unique_ptr<ReadWriteBufferFromHTTP> http_buffer_ptr)
{
Poco::URI uri(url);
String http_request_compression_method_str = http_buffer_ptr->getCompressionMethod();
auto in_ptr_wrapped
= wrapReadBufferWithCompressionMethod(std::move(http_buffer_ptr), chooseCompressionMethod(uri.getPath(), http_request_compression_method_str));
auto input_stream = context->getInputFormat(format, *in_ptr_wrapped, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(in_ptr_wrapped));
}
void HTTPDictionarySource::getUpdateFieldAndDate(Poco::URI & uri)
{
if (update_time != std::chrono::system_clock::from_time_t(0))
@ -109,10 +118,15 @@ BlockInputStreamPtr HTTPDictionarySource::loadAll()
LOG_TRACE(log, "loadAll {}", toString());
Poco::URI uri(url);
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
uri, Poco::Net::HTTPRequest::HTTP_GET, ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts,
0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries);
auto input_stream = context->getInputFormat(format, *in_ptr, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
uri,
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
timeouts,
0,
credentials,
DBMS_DEFAULT_BUFFER_SIZE,
header_entries);
return createWrappedBuffer(std::move(in_ptr));
}
BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll()
@ -121,10 +135,15 @@ BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll()
getUpdateFieldAndDate(uri);
LOG_TRACE(log, "loadUpdatedAll {}", uri.toString());
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
uri, Poco::Net::HTTPRequest::HTTP_GET, ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts,
0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries);
auto input_stream = context->getInputFormat(format, *in_ptr, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
uri,
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
timeouts,
0,
credentials,
DBMS_DEFAULT_BUFFER_SIZE,
header_entries);
return createWrappedBuffer(std::move(in_ptr));
}
BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & ids)
@ -142,10 +161,15 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & id
Poco::URI uri(url);
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts,
0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries);
auto input_stream = context->getInputFormat(format, *in_ptr, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
uri,
Poco::Net::HTTPRequest::HTTP_POST,
out_stream_callback,
timeouts,
0,
credentials,
DBMS_DEFAULT_BUFFER_SIZE,
header_entries);
return createWrappedBuffer(std::move(in_ptr));
}
BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
@ -163,10 +187,15 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns,
Poco::URI uri(url);
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts,
0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries);
auto input_stream = context->getInputFormat(format, *in_ptr, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
uri,
Poco::Net::HTTPRequest::HTTP_POST,
out_stream_callback,
timeouts,
0,
credentials,
DBMS_DEFAULT_BUFFER_SIZE,
header_entries);
return createWrappedBuffer(std::move(in_ptr));
}
bool HTTPDictionarySource::isModified() const
@ -203,16 +232,14 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
Block & sample_block,
ContextPtr context,
const std::string & /* default_database */,
bool check_config) -> DictionarySourcePtr
{
bool check_config) -> DictionarySourcePtr {
if (dict_struct.has_expressions)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Dictionary source of type `http` does not support attribute expressions");
auto context_local_copy = copyContextAndApplySettings(config_prefix, context, config);
return std::make_unique<HTTPDictionarySource>(
dict_struct, config, config_prefix + ".http",
sample_block, context_local_copy, check_config);
dict_struct, config, config_prefix + ".http", sample_block, context_local_copy, check_config);
};
factory.registerSource("http", create_table_source);
}

View File

@ -8,6 +8,7 @@
#include "DictionaryStructure.h"
#include "IDictionarySource.h"
#include <Interpreters/Context.h>
#include <IO/CompressionMethod.h>
namespace Poco
{
@ -53,6 +54,9 @@ public:
private:
void getUpdateFieldAndDate(Poco::URI & uri);
// wrap buffer using encoding from made request
BlockInputStreamPtr createWrappedBuffer(std::unique_ptr<ReadWriteBufferFromHTTP> http_buffer);
Poco::Logger * log;
LocalDateTime getLastModification() const;
@ -70,3 +74,4 @@ private:
};
}

View File

@ -115,6 +115,7 @@ std::shared_ptr<Aws::S3::S3Client>
getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextConstPtr context)
{
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
config.getString(config_prefix + ".region", ""),
context->getRemoteHostFilter(), context->getGlobalContext()->getSettingsRef().s3_max_redirects);
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));

View File

@ -1329,7 +1329,7 @@ public:
});
}
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const DataTypes & types, ValuePlaceholders values) const override
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const DataTypes & types, Values values) const override
{
assert(2 == types.size() && 2 == values.size());
@ -1346,8 +1346,8 @@ public:
{
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
auto type = std::make_shared<ResultDataType>();
auto * lval = nativeCast(b, types[0], values[0](), type);
auto * rval = nativeCast(b, types[1], values[1](), type);
auto * lval = nativeCast(b, types[0], values[0], type);
auto * rval = nativeCast(b, types[1], values[1], type);
result = OpSpec::compile(b, lval, rval, std::is_signed_v<typename ResultDataType::FieldType>);
return true;
}

View File

@ -10,7 +10,6 @@
namespace DB
{
template <bool null_is_false>
class FunctionIfBase : public IFunction
{
#if USE_EMBEDDED_COMPILER
@ -40,46 +39,40 @@ public:
return true;
}
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const DataTypes & types, ValuePlaceholders values) const override
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const DataTypes & types, Values values) const override
{
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
auto type = getReturnTypeImpl(types);
llvm::Value * null = nullptr;
if (!null_is_false && type->isNullable())
null = b.CreateInsertValue(llvm::Constant::getNullValue(toNativeType(b, type)), b.getTrue(), {1});
auto return_type = getReturnTypeImpl(types);
auto * head = b.GetInsertBlock();
auto * join = llvm::BasicBlock::Create(head->getContext(), "", head->getParent());
auto * join = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent());
std::vector<std::pair<llvm::BasicBlock *, llvm::Value *>> returns;
for (size_t i = 0; i + 1 < types.size(); i += 2)
{
auto * then = llvm::BasicBlock::Create(head->getContext(), "", head->getParent());
auto * next = llvm::BasicBlock::Create(head->getContext(), "", head->getParent());
auto * cond = values[i]();
if (!null_is_false && types[i]->isNullable())
{
auto * nonnull = llvm::BasicBlock::Create(head->getContext(), "", head->getParent());
returns.emplace_back(b.GetInsertBlock(), null);
b.CreateCondBr(b.CreateExtractValue(cond, {1}), join, nonnull);
b.SetInsertPoint(nonnull);
b.CreateCondBr(nativeBoolCast(b, removeNullable(types[i]), b.CreateExtractValue(cond, {0})), then, next);
}
else
{
auto * then = llvm::BasicBlock::Create(head->getContext(), "then_" + std::to_string(i), head->getParent());
auto * next = llvm::BasicBlock::Create(head->getContext(), "next_" + std::to_string(i), head->getParent());
auto * cond = values[i];
b.CreateCondBr(nativeBoolCast(b, types[i], cond), then, next);
}
b.SetInsertPoint(then);
auto * value = nativeCast(b, types[i + 1], values[i + 1](), type);
auto * value = nativeCast(b, types[i + 1], values[i + 1], return_type);
returns.emplace_back(b.GetInsertBlock(), value);
b.CreateBr(join);
b.SetInsertPoint(next);
}
auto * value = nativeCast(b, types.back(), values.back()(), type);
returns.emplace_back(b.GetInsertBlock(), value);
auto * else_value = nativeCast(b, types.back(), values.back(), return_type);
returns.emplace_back(b.GetInsertBlock(), else_value);
b.CreateBr(join);
b.SetInsertPoint(join);
auto * phi = b.CreatePHI(toNativeType(b, type), returns.size());
for (const auto & r : returns)
phi->addIncoming(r.second, r.first);
auto * phi = b.CreatePHI(toNativeType(b, return_type), returns.size());
for (const auto & [block, value] : returns)
phi->addIncoming(value, block);
return phi;
}
#endif

View File

@ -244,7 +244,7 @@ public:
});
}
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const DataTypes & types, ValuePlaceholders values) const override
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const DataTypes & types, Values values) const override
{
assert(1 == types.size() && 1 == values.size());
@ -261,7 +261,7 @@ public:
if constexpr (!std::is_same_v<T1, InvalidType> && !IsDataTypeDecimal<DataType> && Op<T0>::compilable)
{
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
auto * v = nativeCast(b, types[0], values[0](), std::make_shared<DataTypeNumber<T1>>());
auto * v = nativeCast(b, types[0], values[0], std::make_shared<DataTypeNumber<T1>>());
result = Op<T0>::compile(b, v, is_signed_v<T1>);
return true;
}

View File

@ -1240,23 +1240,27 @@ public:
if (2 != types.size())
return false;
auto isBigInteger = &typeIsEither<DataTypeInt64, DataTypeUInt64, DataTypeUUID>;
auto isFloatingPoint = &typeIsEither<DataTypeFloat32, DataTypeFloat64>;
if ((isBigInteger(*types[0]) && isFloatingPoint(*types[1]))
|| (isBigInteger(*types[1]) && isFloatingPoint(*types[0]))
|| (WhichDataType(types[0]).isDate() && WhichDataType(types[1]).isDateTime())
|| (WhichDataType(types[1]).isDate() && WhichDataType(types[0]).isDateTime()))
WhichDataType data_type_lhs(types[0]);
WhichDataType data_type_rhs(types[1]);
auto is_big_integer = [](WhichDataType type) { return type.isUInt64() || type.isInt64(); };
if ((is_big_integer(data_type_lhs) && data_type_rhs.isFloat())
|| (is_big_integer(data_type_rhs) && data_type_lhs.isFloat())
|| (data_type_lhs.isDate() && data_type_rhs.isDateTime())
|| (data_type_rhs.isDate() && data_type_lhs.isDateTime()))
return false; /// TODO: implement (double, int_N where N > double's mantissa width)
return isCompilableType(types[0]) && isCompilableType(types[1]);
}
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const DataTypes & types, ValuePlaceholders values) const override
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const DataTypes & types, Values values) const override
{
assert(2 == types.size() && 2 == values.size());
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
auto * x = values[0]();
auto * y = values[1]();
auto * x = values[0];
auto * y = values[1];
if (!types[0]->equals(*types[1]))
{
llvm::Type * common;

View File

@ -794,8 +794,11 @@ private:
return result_type;
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
if (input_rows_count == 0)
return result_type->createColumn();
/** We call dictHas function to get which map is key presented in dictionary.
For key that presented in dictionary dict has result for that key index value will be 1. Otherwise 0.
We invert result, and then for key that is not presented in dictionary value will be 1. Otherwise 0.
@ -817,15 +820,15 @@ private:
for (auto & key : is_key_in_dictionary_data)
key = !key;
auto result_type = dictionary_get_func_impl.getReturnTypeImpl(arguments);
auto dictionary_get_result_column = dictionary_get_func_impl.executeImpl(arguments, result_type, input_rows_count);
auto dictionary_get_result_type = dictionary_get_func_impl.getReturnTypeImpl(arguments);
auto dictionary_get_result_column = dictionary_get_func_impl.executeImpl(arguments, dictionary_get_result_type, input_rows_count);
ColumnPtr result;
WhichDataType result_data_type(result_type);
WhichDataType dictionary_get_result_data_type(dictionary_get_result_type);
auto dictionary_get_result_column_mutable = dictionary_get_result_column->assumeMutable();
if (result_data_type.isTuple())
if (dictionary_get_result_data_type.isTuple())
{
ColumnTuple & column_tuple = assert_cast<ColumnTuple &>(*dictionary_get_result_column_mutable);

View File

@ -3,6 +3,7 @@
#include <common/types.h>
#include <Core/Defines.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/IFunctionImpl.h>
#include <IO/WriteHelpers.h>
#include <type_traits>
@ -159,16 +160,16 @@ public:
#if USE_EMBEDDED_COMPILER
bool isCompilableImpl(const DataTypes &) const override { return useDefaultImplementationForNulls(); }
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const DataTypes & types, ValuePlaceholders values) const override
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const DataTypes & types, Values values) const override
{
assert(!types.empty() && !values.empty());
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
if constexpr (!Impl::isSaturable())
{
auto * result = nativeBoolCast(b, types[0], values[0]());
auto * result = nativeBoolCast(b, types[0], values[0]);
for (size_t i = 1; i < types.size(); i++)
result = Impl::apply(b, result, nativeBoolCast(b, types[i], values[i]()));
result = Impl::apply(b, result, nativeBoolCast(b, types[i], values[i]));
return b.CreateSelect(result, b.getInt8(1), b.getInt8(0));
}
constexpr bool breakOnTrue = Impl::isSaturatedValue(true);
@ -179,7 +180,7 @@ public:
for (size_t i = 0; i < types.size(); i++)
{
b.SetInsertPoint(next);
auto * value = values[i]();
auto * value = values[i];
auto * truth = nativeBoolCast(b, types[i], value);
if (!types[i]->equals(DataTypeUInt8{}))
value = b.CreateSelect(truth, b.getInt8(1), b.getInt8(0));
@ -222,10 +223,10 @@ public:
#if USE_EMBEDDED_COMPILER
bool isCompilableImpl(const DataTypes &) const override { return true; }
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const DataTypes & types, ValuePlaceholders values) const override
llvm::Value * compileImpl(llvm::IRBuilderBase & builder, const DataTypes & types, Values values) const override
{
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
return b.CreateSelect(Impl<UInt8>::apply(b, nativeBoolCast(b, types[0], values[0]())), b.getInt8(1), b.getInt8(0));
return b.CreateSelect(Impl<UInt8>::apply(b, nativeBoolCast(b, types[0], values[0])), b.getInt8(1), b.getInt8(0));
}
#endif
};

View File

@ -515,43 +515,49 @@ bool IFunction::isCompilable(const DataTypes & arguments) const
return isCompilableImpl(arguments);
}
llvm::Value * IFunction::compile(llvm::IRBuilderBase & builder, const DataTypes & arguments, ValuePlaceholders values) const
llvm::Value * IFunction::compile(llvm::IRBuilderBase & builder, const DataTypes & arguments, Values values) const
{
if (useDefaultImplementationForNulls())
auto denulled_arguments = removeNullables(arguments);
if (useDefaultImplementationForNulls() && denulled_arguments)
{
if (auto denulled = removeNullables(arguments))
{
/// FIXME: when only one column is nullable, this can actually be slower than the non-jitted version
/// because this involves copying the null map while `wrapInNullable` reuses it.
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
auto * fail = llvm::BasicBlock::Create(b.GetInsertBlock()->getContext(), "", b.GetInsertBlock()->getParent());
auto * join = llvm::BasicBlock::Create(b.GetInsertBlock()->getContext(), "", b.GetInsertBlock()->getParent());
auto * zero = llvm::Constant::getNullValue(toNativeType(b, makeNullable(getReturnTypeImpl(*denulled))));
for (size_t i = 0; i < arguments.size(); i++)
std::vector<llvm::Value*> unwrapped_values;
std::vector<llvm::Value*> is_null_values;
unwrapped_values.reserve(arguments.size());
is_null_values.reserve(arguments.size());
for (size_t i = 0; i < arguments.size(); ++i)
{
if (!arguments[i]->isNullable())
continue;
/// Would be nice to evaluate all this lazily, but that'd change semantics: if only unevaluated
/// arguments happen to contain NULLs, the return value would not be NULL, though it should be.
auto * value = values[i]();
auto * ok = llvm::BasicBlock::Create(b.GetInsertBlock()->getContext(), "", b.GetInsertBlock()->getParent());
b.CreateCondBr(b.CreateExtractValue(value, {1}), fail, ok);
b.SetInsertPoint(ok);
values[i] = [value = b.CreateExtractValue(value, {0})]() { return value; };
auto * value = values[i];
WhichDataType data_type(arguments[i]);
if (data_type.isNullable())
{
unwrapped_values.emplace_back(b.CreateExtractValue(value, {0}));
is_null_values.emplace_back(b.CreateExtractValue(value, {1}));
}
auto * result = b.CreateInsertValue(zero, compileImpl(builder, *denulled, std::move(values)), {0});
auto * result_columns = b.GetInsertBlock();
b.CreateBr(join);
b.SetInsertPoint(fail);
auto * null = b.CreateInsertValue(zero, b.getTrue(), {1});
b.CreateBr(join);
b.SetInsertPoint(join);
auto * phi = b.CreatePHI(result->getType(), 2);
phi->addIncoming(result, result_columns);
phi->addIncoming(null, fail);
return phi;
else
{
unwrapped_values.emplace_back(value);
}
}
auto * result = compileImpl(builder, *denulled_arguments, unwrapped_values);
auto * nullable_structure_type = toNativeType(b, makeNullable(getReturnTypeImpl(*denulled_arguments)));
auto * nullable_structure_value = llvm::Constant::getNullValue(nullable_structure_type);
auto * nullable_structure_with_result_value = b.CreateInsertValue(nullable_structure_value, result, {0});
auto * nullable_structure_result_null = b.CreateExtractValue(nullable_structure_with_result_value, {1});
for (auto * is_null_value : is_null_values)
nullable_structure_result_null = b.CreateOr(nullable_structure_result_null, is_null_value);
return b.CreateInsertValue(nullable_structure_with_result_value, nullable_structure_result_null, {1});
}
return compileImpl(builder, arguments, std::move(values));
}

View File

@ -54,7 +54,7 @@ public:
using ExecutableFunctionPtr = std::shared_ptr<IExecutableFunction>;
using ValuePlaceholders = std::vector<std::function<llvm::Value * ()>>;
using Values = std::vector<llvm::Value *>;
/// Function with known arguments and return type (when the specific overload was chosen).
/// It is also the point where all function-specific properties are known.
@ -90,7 +90,7 @@ public:
* templates with default arguments is impossible and including LLVM in such a generic header
* as this one is a major pain.
*/
virtual llvm::Value * compile(llvm::IRBuilderBase & /*builder*/, ValuePlaceholders /*values*/) const
virtual llvm::Value * compile(llvm::IRBuilderBase & /*builder*/, Values /*values*/) const
{
throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -54,7 +54,7 @@ public:
bool isCompilable() const final { return impl->isCompilable(); }
llvm::Value * compile(llvm::IRBuilderBase & builder, ValuePlaceholders values) const override
llvm::Value * compile(llvm::IRBuilderBase & builder, Values values) const override
{
return impl->compile(builder, std::move(values));
}
@ -183,7 +183,10 @@ public:
bool isCompilable() const override { return function->isCompilable(getArgumentTypes()); }
llvm::Value * compile(llvm::IRBuilderBase & builder, ValuePlaceholders values) const override { return function->compile(builder, getArgumentTypes(), std::move(values)); }
llvm::Value * compile(llvm::IRBuilderBase & builder, Values values) const override
{
return function->compile(builder, getArgumentTypes(), std::move(values));
}
#endif

View File

@ -96,7 +96,7 @@ public:
virtual bool isCompilable() const { return false; }
virtual llvm::Value * compile(llvm::IRBuilderBase & /*builder*/, ValuePlaceholders /*values*/) const
virtual llvm::Value * compile(llvm::IRBuilderBase & /*builder*/, Values /*values*/) const
{
throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED);
}
@ -237,20 +237,6 @@ public:
*/
virtual bool canBeExecutedOnDefaultArguments() const { return true; }
#if USE_EMBEDDED_COMPILER
virtual bool isCompilable() const
{
throw Exception("isCompilable without explicit types is not implemented for IFunction", ErrorCodes::NOT_IMPLEMENTED);
}
virtual llvm::Value * compile(llvm::IRBuilderBase & /*builder*/, ValuePlaceholders /*values*/) const
{
throw Exception("compile without explicit types is not implemented for IFunction", ErrorCodes::NOT_IMPLEMENTED);
}
#endif
/// Properties from IFunctionBase (see IFunction.h)
virtual bool isSuitableForConstantFolding() const { return true; }
virtual ColumnPtr getResultIfAlwaysReturnsConstantAndHasArguments(const ColumnsWithTypeAndName & /*arguments*/) const { return nullptr; }
@ -298,7 +284,7 @@ public:
bool isCompilable(const DataTypes & arguments) const;
llvm::Value * compile(llvm::IRBuilderBase &, const DataTypes & arguments, ValuePlaceholders values) const;
llvm::Value * compile(llvm::IRBuilderBase &, const DataTypes & arguments, Values values) const;
#endif
@ -308,7 +294,7 @@ protected:
virtual bool isCompilableImpl(const DataTypes &) const { return false; }
virtual llvm::Value * compileImpl(llvm::IRBuilderBase &, const DataTypes &, ValuePlaceholders) const
virtual llvm::Value * compileImpl(llvm::IRBuilderBase &, const DataTypes &, Values) const
{
throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -26,10 +26,13 @@ struct GreatestBaseImpl
static inline llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * left, llvm::Value * right, bool is_signed)
{
if (!left->getType()->isIntegerTy())
/// XXX maxnum is basically fmax(), it may or may not match whatever apply() does
/// XXX CreateMaxNum is broken on LLVM 5.0 and 6.0 (generates minnum instead; fixed in 7)
return b.CreateBinaryIntrinsic(llvm::Intrinsic::maxnum, left, right);
return b.CreateSelect(is_signed ? b.CreateICmpSGT(left, right) : b.CreateICmpUGT(left, right), left, right);
{
/// Follows the IEEE-754 semantics for maxNum except for the handling of signaling NaNs. This matches the behavior of libc fmax.
return b.CreateMaxNum(left, right);
}
auto * compare_value = is_signed ? b.CreateICmpSGT(left, right) : b.CreateICmpUGT(left, right);
return b.CreateSelect(compare_value, left, right);
}
#endif
};

View File

@ -169,7 +169,7 @@ public:
};
class FunctionIf : public FunctionIfBase</*null_is_false=*/false>
class FunctionIf : public FunctionIfBase
{
public:
static constexpr auto name = "if";

View File

@ -26,9 +26,13 @@ struct LeastBaseImpl
static inline llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * left, llvm::Value * right, bool is_signed)
{
if (!left->getType()->isIntegerTy())
/// XXX minnum is basically fmin(), it may or may not match whatever apply() does
{
/// Follows the IEEE-754 semantics for minNum, except for handling of signaling NaNs. This matchs the behavior of libc fmin.
return b.CreateMinNum(left, right);
return b.CreateSelect(is_signed ? b.CreateICmpSLT(left, right) : b.CreateICmpULT(left, right), left, right);
}
auto * compare_value = is_signed ? b.CreateICmpSLT(left, right) : b.CreateICmpULT(left, right);
return b.CreateSelect(compare_value, left, right);
}
#endif
};

View File

@ -31,7 +31,7 @@ namespace
///
/// Additionally the arguments, conditions or branches, support nullable types
/// and the NULL value, with a NULL condition treated as false.
class FunctionMultiIf final : public FunctionIfBase</*null_is_false=*/true>
class FunctionMultiIf final : public FunctionIfBase
{
public:
static constexpr auto name = "multiIf";

View File

@ -46,7 +46,6 @@ std::string toContentEncodingName(CompressionMethod method)
__builtin_unreachable();
}
CompressionMethod chooseCompressionMethod(const std::string & path, const std::string & hint)
{
std::string file_extension;

View File

@ -92,6 +92,7 @@ namespace detail
protected:
Poco::URI uri;
std::string method;
std::string content_encoding;
UpdatableSessionPtr session;
std::istream * istr; /// owned by session
@ -137,6 +138,7 @@ namespace detail
istr = receiveResponse(*sess, request, response, true);
response.getCookies(cookies);
content_encoding = response.get("Content-Encoding", "");
return istr;
}
@ -230,6 +232,11 @@ namespace detail
/// Some data maybe already read
next_callback(count());
}
const std::string & getCompressionMethod() const
{
return content_encoding;
}
};
}

View File

@ -47,9 +47,11 @@ namespace DB::S3
{
PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
const String & force_region_,
const RemoteHostFilter & remote_host_filter_,
unsigned int s3_max_redirects_)
: remote_host_filter(remote_host_filter_)
: force_region(force_region_)
, remote_host_filter(remote_host_filter_)
, s3_max_redirects(s3_max_redirects_)
{
}
@ -63,6 +65,8 @@ void PocoHTTPClientConfiguration::updateSchemeAndRegion()
if (uri.getScheme() == "http")
scheme = Aws::Http::Scheme::HTTP;
if (force_region.empty())
{
String matched_region;
if (re2::RE2::PartialMatch(uri.getHost(), region_pattern, &matched_region))
{
@ -71,9 +75,15 @@ void PocoHTTPClientConfiguration::updateSchemeAndRegion()
}
else
{
/// In global mode AWS C++ SDK send `us-east-1` but accept switching to another one if being suggested.
region = Aws::Region::AWS_GLOBAL;
}
}
else
{
region = force_region;
}
}
}
@ -176,7 +186,32 @@ void PocoHTTPClient::makeRequestInternal(
Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1);
poco_request.setURI(target_uri.getPathAndQuery());
/** Aws::Http::URI will encode URL in appropriate way for AWS S3 server.
* Poco::URI also does that correctly but it's not compatible with AWS.
* For example, `+` symbol will not be converted to `%2B` by Poco and would
* be received as space symbol.
*
* References:
* https://github.com/aws/aws-sdk-java/issues/1946
* https://forums.aws.amazon.com/thread.jspa?threadID=55746
*
* Example:
* Suppose we are requesting a file: abc+def.txt
* To correctly do it, we need to construct an URL containing either:
* - abc%2Bdef.txt
* this is also technically correct:
* - abc+def.txt
* but AWS servers don't support it properly, interpreting plus character as whitespace
* although it is in path part, not in query string.
* e.g. this is not correct:
* - abc%20def.txt
*
* Poco will keep plus character as is (which is correct) while AWS servers will treat it as whitespace, which is not what is intended.
* To overcome this limitation, we encode URL with "Aws::Http::URI" and then pass already prepared URL to Poco.
*/
Aws::Http::URI aws_target_uri(uri);
poco_request.setURI(aws_target_uri.GetPath() + aws_target_uri.GetQueryString());
switch (request.GetMethod())
{

View File

@ -29,13 +29,14 @@ class ClientFactory;
struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
{
String force_region;
const RemoteHostFilter & remote_host_filter;
unsigned int s3_max_redirects;
void updateSchemeAndRegion();
private:
PocoHTTPClientConfiguration(const RemoteHostFilter & remote_host_filter_, unsigned int s3_max_redirects_);
PocoHTTPClientConfiguration(const String & force_region_, const RemoteHostFilter & remote_host_filter_, unsigned int s3_max_redirects_);
/// Constructor of Aws::Client::ClientConfiguration must be called after AWS SDK initialization.
friend ClientFactory;

View File

@ -410,7 +410,7 @@ public:
}
else if (Aws::Utils::StringUtils::ToLower(ec2_metadata_disabled.c_str()) != "true")
{
DB::S3::PocoHTTPClientConfiguration aws_client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(configuration.remote_host_filter, configuration.s3_max_redirects);
DB::S3::PocoHTTPClientConfiguration aws_client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(configuration.region, configuration.remote_host_filter, configuration.s3_max_redirects);
/// See MakeDefaultHttpResourceClientConfiguration().
/// This is part of EC2 metadata client, but unfortunately it can't be accessed from outside
@ -590,10 +590,11 @@ namespace S3
}
PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT
const String & force_region,
const RemoteHostFilter & remote_host_filter,
unsigned int s3_max_redirects)
{
return PocoHTTPClientConfiguration(remote_host_filter, s3_max_redirects);
return PocoHTTPClientConfiguration(force_region, remote_host_filter, s3_max_redirects);
}
URI::URI(const Poco::URI & uri_)

View File

@ -42,6 +42,7 @@ public:
bool use_insecure_imds_request);
PocoHTTPClientConfiguration createClientConfiguration(
const String & force_region,
const RemoteHostFilter & remote_host_filter,
unsigned int s3_max_redirects);

View File

@ -681,9 +681,13 @@ std::string ActionsDAG::dumpDAG() const
out << " " << (node.column ? node.column->getName() : "(no column)");
out << " " << (node.result_type ? node.result_type->getName() : "(no type)");
out << " " << (!node.result_name.empty() ? node.result_name : "(no name)");
if (node.function_base)
out << " [" << node.function_base->getName() << "]";
if (node.is_function_compiled)
out << " [compiled]";
out << "\n";
}
@ -1195,10 +1199,9 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
ActionsDAG::SplitResult ActionsDAG::splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) const
{
struct Frame
{
const Node * node;
const Node * node = nullptr;
size_t next_child_to_visit = 0;
};
@ -1294,7 +1297,7 @@ ConjunctionNodes getConjunctionNodes(ActionsDAG::Node * predicate, std::unordere
struct Frame
{
const ActionsDAG::Node * node;
const ActionsDAG::Node * node = nullptr;
bool is_predicate = false;
size_t next_child_to_visit = 0;
size_t num_allowed_children = 0;
@ -1410,7 +1413,7 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunctio
struct Frame
{
const ActionsDAG::Node * node;
const ActionsDAG::Node * node = nullptr;
size_t next_child_to_visit = 0;
};

View File

@ -837,7 +837,12 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
data_variants.init(data_variants.type);
data_variants.aggregates_pools = Arenas(1, std::make_shared<Arena>());
data_variants.aggregates_pool = data_variants.aggregates_pools.back().get();
data_variants.without_key = nullptr;
if (params.overflow_row || data_variants.type == AggregatedDataVariants::Type::without_key)
{
AggregateDataPtr place = data_variants.aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(place);
data_variants.without_key = place;
}
block_out.flush();
compressed_buf.next();
@ -1297,6 +1302,9 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
{
AggregatedDataWithoutKey & data = data_variants.without_key;
if (!data)
throw Exception("Wrong data variant passed.", ErrorCodes::LOGICAL_ERROR);
if (!final_)
{
for (size_t i = 0; i < params.aggregates_size; ++i)

View File

@ -1,9 +1,10 @@
#include <Interpreters/AsynchronousMetricLog.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/AsynchronousMetrics.h>
@ -17,7 +18,7 @@ Block AsynchronousMetricLogElement::createBlock()
columns.emplace_back(std::make_shared<DataTypeDate>(), "event_date");
columns.emplace_back(std::make_shared<DataTypeDateTime>(), "event_time");
columns.emplace_back(std::make_shared<DataTypeDateTime64>(6), "event_time_microseconds");
columns.emplace_back(std::make_shared<DataTypeString>(), "name");
columns.emplace_back(std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), "name");
columns.emplace_back(std::make_shared<DataTypeFloat64>(), "value");
return Block(columns);

View File

@ -197,8 +197,11 @@ void AsynchronousMetrics::update()
#if USE_EMBEDDED_COMPILER
{
if (auto * compiled_expression_cache = CompiledExpressionCacheFactory::instance().tryGetCache())
{
new_values["CompiledExpressionCacheBytes"] = compiled_expression_cache->weight();
new_values["CompiledExpressionCacheCount"] = compiled_expression_cache->count();
}
}
#endif
new_values["Uptime"] = getContext()->getUptimeSeconds();

View File

@ -450,6 +450,12 @@ struct ContextSharedPart
/// TODO: Get rid of this.
delete_system_logs = std::move(system_logs);
#if USE_EMBEDDED_COMPILER
if (auto * cache = CompiledExpressionCacheFactory::instance().tryGetCache())
cache->reset();
#endif
embedded_dictionaries.reset();
external_dictionaries_loader.reset();
models_repository_guard.reset();

View File

@ -116,10 +116,6 @@ struct BackgroundTaskSchedulingSettings;
class ZooKeeperMetadataTransaction;
using ZooKeeperMetadataTransactionPtr = std::shared_ptr<ZooKeeperMetadataTransaction>;
#if USE_EMBEDDED_COMPILER
class CompiledExpressionCache;
#endif
/// Callback for external tables initializer
using ExternalTablesInitializer = std::function<void(ContextPtr)>;

File diff suppressed because it is too large Load Diff

View File

@ -5,10 +5,7 @@
#endif
#if USE_EMBEDDED_COMPILER
# include <set>
# include <Functions/IFunctionImpl.h>
# include <Interpreters/Context.h>
# include <Interpreters/ExpressionActions.h>
# include <Common/LRUCache.h>
# include <Common/HashTable/Hash.h>
@ -16,88 +13,27 @@
namespace DB
{
using CompilableExpression = std::function<llvm::Value * (llvm::IRBuilderBase &, const ValuePlaceholders &)>;
struct LLVMModuleState;
class LLVMFunction : public IFunctionBaseImpl
struct CompiledFunction
{
std::string name;
DataTypes arg_types;
std::vector<FunctionBasePtr> originals;
CompilableExpression expression;
std::unique_ptr<LLVMModuleState> module_state;
public:
/// LLVMFunction is a compiled part of ActionsDAG.
/// We store this part as independent DAG with minial required information to compile it.
struct CompileNode
{
enum class NodeType
{
INPUT = 0,
CONSTANT = 1,
FUNCTION = 2,
};
NodeType type;
DataTypePtr result_type;
/// For CONSTANT
ColumnPtr column;
/// For FUNCTION
FunctionBasePtr function;
std::vector<size_t> arguments;
size_t compiled_size;
};
/// DAG is represented as list of nodes stored in in-order traverse order.
/// Expression (a + 1) + (b + 1) will be represented like chain: a, 1, a + 1, b, b + 1, (a + 1) + (b + 1).
struct CompileDAG : public std::vector<CompileNode>
struct CompiledFunctionWeightFunction
{
std::string dump() const;
UInt128 hash() const;
};
explicit LLVMFunction(const CompileDAG & dag);
bool isCompilable() const override { return true; }
llvm::Value * compile(llvm::IRBuilderBase & builder, ValuePlaceholders values) const override;
String getName() const override { return name; }
const DataTypes & getArgumentTypes() const override { return arg_types; }
const DataTypePtr & getResultType() const override { return originals.back()->getResultType(); }
ExecutableFunctionImplPtr prepare(const ColumnsWithTypeAndName &) const override;
bool isDeterministic() const override;
bool isDeterministicInScopeOfQuery() const override;
bool isSuitableForConstantFolding() const override;
bool isInjective(const ColumnsWithTypeAndName & sample_block) const override;
bool hasInformationAboutMonotonicity() const override;
Monotonicity getMonotonicityForRange(const IDataType & type, const Field & left, const Field & right) const override;
const LLVMModuleState * getLLVMModuleState() const { return module_state.get(); }
size_t operator()(const CompiledFunction & compiled_function) const
{
return compiled_function.compiled_size;
}
};
/** This child of LRUCache breaks one of it's invariants: total weight may be changed after insertion.
* We have to do so, because we don't known real memory consumption of generated LLVM code for every function.
*/
class CompiledExpressionCache : public LRUCache<UInt128, IFunctionBase, UInt128Hash>
class CompiledExpressionCache : public LRUCache<UInt128, CompiledFunction, UInt128Hash, CompiledFunctionWeightFunction>
{
public:
using Base = LRUCache<UInt128, IFunctionBase, UInt128Hash>;
using Base = LRUCache<UInt128, CompiledFunction, UInt128Hash, CompiledFunctionWeightFunction>;
using Base::Base;
};

View File

@ -0,0 +1,382 @@
#include "CHJIT.h"
#if USE_EMBEDDED_COMPILER
#include <llvm/Analysis/TargetTransformInfo.h>
#include <llvm/IR/BasicBlock.h>
#include <llvm/IR/DataLayout.h>
#include <llvm/IR/DerivedTypes.h>
#include <llvm/IR/Function.h>
#include <llvm/IR/IRBuilder.h>
#include <llvm/IR/Mangler.h>
#include <llvm/IR/Type.h>
#include <llvm/IR/LegacyPassManager.h>
#include <llvm/ExecutionEngine/JITSymbol.h>
#include <llvm/ExecutionEngine/SectionMemoryManager.h>
#include <llvm/ExecutionEngine/JITEventListener.h>
#include <llvm/MC/SubtargetFeature.h>
#include <llvm/Support/DynamicLibrary.h>
#include <llvm/Support/Host.h>
#include <llvm/Support/TargetRegistry.h>
#include <llvm/Support/TargetSelect.h>
#include <llvm/Transforms/IPO/PassManagerBuilder.h>
#include <llvm/Support/SmallVectorMemoryBuffer.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPILE_CODE;
extern const int LOGICAL_ERROR;
}
/** Simple module to object file compiler.
* Result object cannot be used as machine code directly, it should be passed to linker.
*/
class JITCompiler
{
public:
explicit JITCompiler(llvm::TargetMachine &target_machine_)
: target_machine(target_machine_)
{
}
std::unique_ptr<llvm::MemoryBuffer> compile(llvm::Module & module)
{
auto materialize_error = module.materializeAll();
if (materialize_error)
{
std::string error_message;
handleAllErrors(std::move(materialize_error),
[&](const llvm::ErrorInfoBase & error_info) { error_message = error_info.message(); });
throw Exception(ErrorCodes::CANNOT_COMPILE_CODE, "Cannot materialize module: {}", error_message);
}
llvm::SmallVector<char, 4096> object_buffer;
llvm::raw_svector_ostream object_stream(object_buffer);
llvm::legacy::PassManager pass_manager;
llvm::MCContext * machine_code_context = nullptr;
if (target_machine.addPassesToEmitMC(pass_manager, machine_code_context, object_stream))
throw Exception(ErrorCodes::CANNOT_COMPILE_CODE, "MachineCode is not supported for the platform");
pass_manager.run(module);
std::unique_ptr<llvm::MemoryBuffer> compiled_object_buffer = std::make_unique<llvm::SmallVectorMemoryBuffer>(
std::move(object_buffer), "<in memory object compiled from " + module.getModuleIdentifier() + ">");
return compiled_object_buffer;
}
~JITCompiler() = default;
private:
llvm::TargetMachine & target_machine;
};
/** MemoryManager for module.
* Keep total allocated size during RuntimeDyld linker execution.
* Actual compiled code memory is stored in llvm::SectionMemoryManager member, we cannot use ZeroBase optimization here
* because it is required for llvm::SectionMemoryManager::MemoryMapper to live longer than llvm::SectionMemoryManager.
*/
class JITModuleMemoryManager
{
class DefaultMMapper final : public llvm::SectionMemoryManager::MemoryMapper
{
public:
llvm::sys::MemoryBlock allocateMappedMemory(
llvm::SectionMemoryManager::AllocationPurpose Purpose [[maybe_unused]],
size_t NumBytes,
const llvm::sys::MemoryBlock * const NearBlock,
unsigned Flags,
std::error_code & EC) override
{
auto allocated_memory_block = llvm::sys::Memory::allocateMappedMemory(NumBytes, NearBlock, Flags, EC);
allocated_size += allocated_memory_block.allocatedSize();
return allocated_memory_block;
}
std::error_code protectMappedMemory(const llvm::sys::MemoryBlock & Block, unsigned Flags) override
{
return llvm::sys::Memory::protectMappedMemory(Block, Flags);
}
std::error_code releaseMappedMemory(llvm::sys::MemoryBlock & M) override { return llvm::sys::Memory::releaseMappedMemory(M); }
size_t allocated_size = 0;
};
public:
JITModuleMemoryManager() : manager(&mmaper) { }
inline size_t getAllocatedSize() const { return mmaper.allocated_size; }
inline llvm::SectionMemoryManager & getManager() { return manager; }
private:
DefaultMMapper mmaper;
llvm::SectionMemoryManager manager;
};
class JITSymbolResolver : public llvm::LegacyJITSymbolResolver
{
public:
llvm::JITSymbol findSymbolInLogicalDylib(const std::string &) override { return nullptr; }
llvm::JITSymbol findSymbol(const std::string & Name) override
{
auto address_it = symbol_name_to_symbol_address.find(Name);
if (address_it == symbol_name_to_symbol_address.end())
throw Exception(ErrorCodes::CANNOT_COMPILE_CODE, "Could not find symbol {}", Name);
uint64_t symbol_address = reinterpret_cast<uint64_t>(address_it->second);
auto jit_symbol = llvm::JITSymbol(symbol_address, llvm::JITSymbolFlags::None);
return jit_symbol;
}
void registerSymbol(const std::string & symbol_name, void * symbol) { symbol_name_to_symbol_address[symbol_name] = symbol; }
~JITSymbolResolver() override = default;
private:
std::unordered_map<std::string, void *> symbol_name_to_symbol_address;
};
/// GDB JITEventListener. Can be used if result machine code need to be debugged.
// class JITEventListener
// {
// public:
// JITEventListener()
// : gdb_listener(llvm::JITEventListener::createGDBRegistrationListener())
// {}
// void notifyObjectLoaded(
// llvm::JITEventListener::ObjectKey object_key,
// const llvm::object::ObjectFile & object_file,
// const llvm::RuntimeDyld::LoadedObjectInfo & loaded_object_Info)
// {
// gdb_listener->notifyObjectLoaded(object_key, object_file, loaded_object_Info);
// }
// void notifyFreeingObject(llvm::JITEventListener::ObjectKey object_key)
// {
// gdb_listener->notifyFreeingObject(object_key);
// }
// private:
// llvm::JITEventListener * gdb_listener = nullptr;
// };
CHJIT::CHJIT()
: machine(getTargetMachine())
, layout(machine->createDataLayout())
, compiler(std::make_unique<JITCompiler>(*machine))
, symbol_resolver(std::make_unique<JITSymbolResolver>())
{
/// Define common symbols that can be generated during compilation
/// Necessary for valid linker symbol resolution
symbol_resolver->registerSymbol("memset", reinterpret_cast<void *>(&memset));
symbol_resolver->registerSymbol("memcpy", reinterpret_cast<void *>(&memcpy));
symbol_resolver->registerSymbol("memcmp", reinterpret_cast<void *>(&memcmp));
}
CHJIT::~CHJIT() = default;
CHJIT::CompiledModuleInfo CHJIT::compileModule(std::function<void (llvm::Module &)> compile_function)
{
std::lock_guard<std::mutex> lock(jit_lock);
auto module = createModuleForCompilation();
compile_function(*module);
auto module_info = compileModule(std::move(module));
++current_module_key;
return module_info;
}
std::unique_ptr<llvm::Module> CHJIT::createModuleForCompilation()
{
std::unique_ptr<llvm::Module> module = std::make_unique<llvm::Module>("jit" + std::to_string(current_module_key), context);
module->setDataLayout(layout);
module->setTargetTriple(machine->getTargetTriple().getTriple());
return module;
}
CHJIT::CompiledModuleInfo CHJIT::compileModule(std::unique_ptr<llvm::Module> module)
{
runOptimizationPassesOnModule(*module);
auto buffer = compiler->compile(*module);
llvm::Expected<std::unique_ptr<llvm::object::ObjectFile>> object = llvm::object::ObjectFile::createObjectFile(*buffer);
if (!object)
{
std::string error_message;
handleAllErrors(object.takeError(), [&](const llvm::ErrorInfoBase & error_info) { error_message = error_info.message(); });
throw Exception(ErrorCodes::CANNOT_COMPILE_CODE, "Cannot create object file from compiled buffer: {}", error_message);
}
std::unique_ptr<JITModuleMemoryManager> module_memory_manager = std::make_unique<JITModuleMemoryManager>();
llvm::RuntimeDyld dynamic_linker = {module_memory_manager->getManager(), *symbol_resolver};
std::unique_ptr<llvm::RuntimeDyld::LoadedObjectInfo> linked_object = dynamic_linker.loadObject(*object.get());
dynamic_linker.resolveRelocations();
module_memory_manager->getManager().finalizeMemory();
CompiledModuleInfo module_info;
for (const auto & function : *module)
{
if (function.isDeclaration())
continue;
auto function_name = std::string(function.getName());
auto mangled_name = getMangledName(function_name);
auto jit_symbol = dynamic_linker.getSymbol(mangled_name);
if (!jit_symbol)
throw Exception(ErrorCodes::CANNOT_COMPILE_CODE, "DynamicLinker could not found symbol {} after compilation", function_name);
auto * jit_symbol_address = reinterpret_cast<void *>(jit_symbol.getAddress());
std::string symbol_name = std::to_string(current_module_key) + '_' + function_name;
name_to_symbol[symbol_name] = jit_symbol_address;
module_info.compiled_functions.emplace_back(std::move(function_name));
}
module_info.size = module_memory_manager->getAllocatedSize();
module_info.identifier = current_module_key;
module_identifier_to_memory_manager[current_module_key] = std::move(module_memory_manager);
compiled_code_size.fetch_add(module_info.size, std::memory_order_relaxed);
return module_info;
}
void CHJIT::deleteCompiledModule(const CHJIT::CompiledModuleInfo & module_info)
{
std::lock_guard<std::mutex> lock(jit_lock);
auto module_it = module_identifier_to_memory_manager.find(module_info.identifier);
if (module_it == module_identifier_to_memory_manager.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no compiled module with identifier {}", module_info.identifier);
for (const auto & function : module_info.compiled_functions)
name_to_symbol.erase(function);
module_identifier_to_memory_manager.erase(module_it);
compiled_code_size.fetch_sub(module_info.size, std::memory_order_relaxed);
}
void * CHJIT::findCompiledFunction(const CompiledModuleInfo & module_info, const std::string & function_name) const
{
std::lock_guard<std::mutex> lock(jit_lock);
std::string symbol_name = std::to_string(module_info.identifier) + '_' + function_name;
auto it = name_to_symbol.find(symbol_name);
if (it != name_to_symbol.end())
return it->second;
return nullptr;
}
void CHJIT::registerExternalSymbol(const std::string & symbol_name, void * address)
{
std::lock_guard<std::mutex> lock(jit_lock);
symbol_resolver->registerSymbol(symbol_name, address);
}
std::string CHJIT::getMangledName(const std::string & name_to_mangle) const
{
std::string mangled_name;
llvm::raw_string_ostream mangled_name_stream(mangled_name);
llvm::Mangler::getNameWithPrefix(mangled_name_stream, name_to_mangle, layout);
mangled_name_stream.flush();
return mangled_name;
}
void CHJIT::runOptimizationPassesOnModule(llvm::Module & module) const
{
llvm::PassManagerBuilder pass_manager_builder;
llvm::legacy::PassManager mpm;
llvm::legacy::FunctionPassManager fpm(&module);
pass_manager_builder.OptLevel = 3;
pass_manager_builder.SLPVectorize = true;
pass_manager_builder.LoopVectorize = true;
pass_manager_builder.RerollLoops = true;
pass_manager_builder.VerifyInput = true;
pass_manager_builder.VerifyOutput = true;
machine->adjustPassManager(pass_manager_builder);
fpm.add(llvm::createTargetTransformInfoWrapperPass(machine->getTargetIRAnalysis()));
mpm.add(llvm::createTargetTransformInfoWrapperPass(machine->getTargetIRAnalysis()));
pass_manager_builder.populateFunctionPassManager(fpm);
pass_manager_builder.populateModulePassManager(mpm);
fpm.doInitialization();
for (auto & function : module)
fpm.run(function);
fpm.doFinalization();
mpm.run(module);
}
std::unique_ptr<llvm::TargetMachine> CHJIT::getTargetMachine()
{
static std::once_flag llvm_target_initialized;
std::call_once(llvm_target_initialized, []()
{
llvm::InitializeNativeTarget();
llvm::InitializeNativeTargetAsmPrinter();
llvm::sys::DynamicLibrary::LoadLibraryPermanently(nullptr);
});
std::string error;
auto cpu = llvm::sys::getHostCPUName();
auto triple = llvm::sys::getProcessTriple();
const auto * target = llvm::TargetRegistry::lookupTarget(triple, error);
if (!target)
throw Exception(ErrorCodes::CANNOT_COMPILE_CODE, "Cannot find target triple {} error: {}", triple, error);
llvm::SubtargetFeatures features;
llvm::StringMap<bool> feature_map;
if (llvm::sys::getHostCPUFeatures(feature_map))
for (auto & f : feature_map)
features.AddFeature(f.first(), f.second);
llvm::TargetOptions options;
bool jit = true;
auto * target_machine = target->createTargetMachine(triple,
cpu,
features.getString(),
options,
llvm::None,
llvm::None,
llvm::CodeGenOpt::Aggressive,
jit);
if (!target_machine)
throw Exception(ErrorCodes::CANNOT_COMPILE_CODE, "Cannot create target machine");
return std::unique_ptr<llvm::TargetMachine>(target_machine);
}
}
#endif

View File

@ -0,0 +1,120 @@
#pragma once
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_EMBEDDED_COMPILER
#include <unordered_map>
#include <atomic>
#include <llvm/IR/LLVMContext.h>
#include <llvm/IR/Module.h>
#include <llvm/Target/TargetMachine.h>
namespace DB
{
class JITModuleMemoryManager;
class JITSymbolResolver;
class JITCompiler;
/** Custom jit implementation
* Main use cases:
* 1. Compiled functions in module.
* 2. Release memory for compiled functions.
*
* In LLVM library there are 2 main JIT stacks MCJIT and ORCv2.
*
* Main reasons for custom implementation vs MCJIT
* MCJIT keeps llvm::Module and compiled object code before linking process after module was compiled.
* llvm::Module can be removed, but compiled object code cannot be removed. Memory for compiled code
* will be release only during MCJIT instance destruction. It is too expensive to create MCJIT
* instance for each compiled module.
* Also important reason is that if some error occurred during compilation steps, MCJIT can just terminate
* our program.
*
* Main reasons for custom implementation vs ORCv2.
* ORC is on request compiler, we do not need support for asynchronous compilation.
* It was possible to remove compiled code with ORCv1 but it was deprecated.
* In ORCv2 this probably can be done only with custom layer and materialization unit.
* But it is inconvenient, discard is only called for materialization units by JITDylib that are not yet materialized.
*
* CHJIT interface is thread safe, that means all functions can be called from multiple threads and state of CHJIT instance
* will not be broken.
* It is client responsibility to be sure and do not use compiled code after it was released.
*/
class CHJIT
{
public:
CHJIT();
~CHJIT();
struct CompiledModuleInfo
{
/// Size of compiled module code in bytes
size_t size;
/// Module identifier. Should not be changed by client
uint64_t identifier;
/// Vector of compiled function nameds. Should not be changed by client
std::vector<std::string> compiled_functions;
};
/** Compile module. In compile function client responsibility is to fill module with necessary
* IR code, then it will be compiled by CHJIT instance.
* Return compiled module info.
*/
CompiledModuleInfo compileModule(std::function<void (llvm::Module &)> compile_function);
/** Delete compiled module. Pointers to functions from module become invalid after this call.
* It is client responsibility to be sure that there are no pointers to compiled module code.
*/
void deleteCompiledModule(const CompiledModuleInfo & module_info);
/** Find compiled function using module_info, and function_name.
* It is client responsibility to case result function to right signature.
* After call to deleteCompiledModule compiled functions from module become invalid.
*/
void * findCompiledFunction(const CompiledModuleInfo & module_info, const std::string & function_name) const;
/** Register external symbol for CHJIT instance to use, during linking.
* It can be function, or global constant.
* It is client responsibility to be sure that address of symbol is valid during CHJIT instance lifetime.
*/
void registerExternalSymbol(const std::string & symbol_name, void * address);
/** Total compiled code size for module that are currently valid.
*/
inline size_t getCompiledCodeSize() const { return compiled_code_size.load(std::memory_order_relaxed); }
private:
std::unique_ptr<llvm::Module> createModuleForCompilation();
CompiledModuleInfo compileModule(std::unique_ptr<llvm::Module> module);
std::string getMangledName(const std::string & name_to_mangle) const;
void runOptimizationPassesOnModule(llvm::Module & module) const;
static std::unique_ptr<llvm::TargetMachine> getTargetMachine();
llvm::LLVMContext context;
std::unique_ptr<llvm::TargetMachine> machine;
llvm::DataLayout layout;
std::unique_ptr<JITCompiler> compiler;
std::unique_ptr<JITSymbolResolver> symbol_resolver;
std::unordered_map<std::string, void *> name_to_symbol;
std::unordered_map<uint64_t, std::unique_ptr<JITModuleMemoryManager>> module_identifier_to_memory_manager;
uint64_t current_module_key = 0;
std::atomic<size_t> compiled_code_size = 0;
mutable std::mutex jit_lock;
};
}
#endif

View File

@ -0,0 +1,180 @@
#include "CompileDAG.h"
#if USE_EMBEDDED_COMPILER
#include <llvm/IR/BasicBlock.h>
#include <llvm/IR/Function.h>
#include <llvm/IR/IRBuilder.h>
#include <Common/SipHash.h>
#include <Common/FieldVisitors.h>
#include <Columns/ColumnConst.h>
#include <DataTypes/Native.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
llvm::Value * CompileDAG::compile(llvm::IRBuilderBase & builder, Values input_nodes_values) const
{
assert(input_nodes_values.size() == getInputNodesCount());
llvm::IRBuilder<> & b = static_cast<llvm::IRBuilder<> &>(builder);
PaddedPODArray<llvm::Value *> compiled_values;
compiled_values.resize_fill(nodes.size());
size_t input_nodes_values_index = 0;
size_t compiled_values_index = 0;
size_t dag_size = nodes.size();
for (size_t i = 0; i < dag_size; ++i)
{
const auto & node = nodes[i];
switch (node.type)
{
case CompileType::CONSTANT:
{
auto * native_value = getColumnNativeValue(b, node.result_type, *node.column, 0);
if (!native_value)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot find native value for constant column with type {}",
node.result_type->getName());
compiled_values[compiled_values_index] = native_value;
break;
}
case CompileType::FUNCTION:
{
Values temporary_values;
temporary_values.reserve(node.arguments.size());
for (auto argument_index : node.arguments)
{
assert(compiled_values[argument_index] != nullptr);
temporary_values.emplace_back(compiled_values[argument_index]);
}
compiled_values[compiled_values_index] = node.function->compile(builder, temporary_values);
break;
}
case CompileType::INPUT:
{
compiled_values[compiled_values_index] = input_nodes_values[input_nodes_values_index];
++input_nodes_values_index;
break;
}
}
++compiled_values_index;
}
return compiled_values.back();
}
std::string CompileDAG::dump() const
{
std::vector<std::string> dumped_values;
dumped_values.resize(nodes.size());
size_t dag_size = nodes.size();
for (size_t i = 0; i < dag_size; ++i)
{
const auto & node = nodes[i];
switch (node.type)
{
case CompileType::CONSTANT:
{
const auto * column = typeid_cast<const ColumnConst *>(node.column.get());
const auto & data = column->getDataColumn();
dumped_values[i] = applyVisitor(FieldVisitorToString(), data[0]) + " : " + node.result_type->getName();
break;
}
case CompileType::FUNCTION:
{
std::string function_dump = node.function->getName();
function_dump += '(';
for (auto argument_index : node.arguments)
{
function_dump += dumped_values[argument_index];
function_dump += ", ";
}
if (!node.arguments.empty())
{
function_dump.pop_back();
function_dump.pop_back();
}
function_dump += ')';
dumped_values[i] = std::move(function_dump);
break;
}
case CompileType::INPUT:
{
dumped_values[i] = node.result_type->getName();
break;
}
}
}
return dumped_values.back();
}
UInt128 CompileDAG::hash() const
{
SipHash hash;
for (const auto & node : nodes)
{
hash.update(node.type);
const auto & result_type_name = node.result_type->getName();
hash.update(result_type_name.size());
hash.update(result_type_name);
switch (node.type)
{
case CompileType::CONSTANT:
{
assert_cast<const ColumnConst *>(node.column.get())->getDataColumn().updateHashWithValue(0, hash);
break;
}
case CompileType::FUNCTION:
{
const auto & function_name = node.function->getName();
hash.update(function_name.size());
hash.update(function_name);
for (size_t arg : node.arguments)
hash.update(arg);
break;
}
case CompileType::INPUT:
{
break;
}
}
}
UInt128 result;
hash.get128(result);
return result;
}
}
#endif

View File

@ -0,0 +1,89 @@
#pragma once
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_EMBEDDED_COMPILER
#include <vector>
#include <Core/Types.h>
#include <Columns/IColumn.h>
#include <DataTypes/IDataType.h>
#include <Functions/IFunctionImpl.h>
namespace llvm
{
class Value;
class IRBuilderBase;
}
namespace DB
{
/** This class is needed to compile part of ActionsDAG.
* For example we have expression (a + 1) + (b + 1) in actions dag.
* It must be added into CompileDAG in order of compile evaluation.
* Node a, Constant 1, Function add(a + 1), Input b, Constant 1, Function add(b, 1), Function add(add(a + 1), add(a + 1)).
*
* Compile function must be called with input_nodes_values equal to input nodes count.
* When compile method is called added nodes are compiled in order.
*/
class CompileDAG
{
public:
enum class CompileType
{
INPUT = 0,
CONSTANT = 1,
FUNCTION = 2,
};
struct Node
{
CompileType type;
DataTypePtr result_type;
/// For CONSTANT
ColumnPtr column;
/// For FUNCTION
FunctionBasePtr function;
std::vector<size_t> arguments;
};
llvm::Value * compile(llvm::IRBuilderBase & builder, Values input_nodes_values) const;
std::string dump() const;
UInt128 hash() const;
void addNode(Node node)
{
input_nodes_count += (node.type == CompileType::INPUT);
nodes.emplace_back(std::move(node));
}
inline size_t getNodesCount() const { return nodes.size(); }
inline size_t getInputNodesCount() const { return input_nodes_count; }
inline Node & operator[](size_t index) { return nodes[index]; }
inline const Node & operator[](size_t index) const { return nodes[index]; }
inline Node & front() { return nodes.front(); }
inline const Node & front() const { return nodes.front(); }
inline Node & back() { return nodes.back(); }
inline const Node & back() const { return nodes.back(); }
private:
std::vector<Node> nodes;
size_t input_nodes_count = 0;
};
}
#endif

View File

@ -0,0 +1,187 @@
#include "compileFunction.h"
#if USE_EMBEDDED_COMPILER
#include <llvm/IR/BasicBlock.h>
#include <llvm/IR/Function.h>
#include <llvm/IR/IRBuilder.h>
#include <Common/Stopwatch.h>
#include <Common/ProfileEvents.h>
#include <DataTypes/Native.h>
#include <Interpreters/JIT/CHJIT.h>
namespace
{
struct ColumnDataPlaceholder
{
llvm::Value * data_init = nullptr; /// first row
llvm::Value * null_init = nullptr;
llvm::PHINode * data = nullptr; /// current row
llvm::PHINode * null = nullptr;
};
}
namespace ProfileEvents
{
extern const Event CompileFunction;
extern const Event CompileExpressionsMicroseconds;
extern const Event CompileExpressionsBytes;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ColumnData getColumnData(const IColumn * column)
{
ColumnData result;
const bool is_const = isColumnConst(*column);
if (is_const)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Input columns should not be constant");
if (const auto * nullable = typeid_cast<const ColumnNullable *>(column))
{
result.null_data = nullable->getNullMapColumn().getRawData().data;
column = & nullable->getNestedColumn();
}
result.data = column->getRawData().data;
return result;
}
static void compileFunction(llvm::Module & module, const IFunctionBaseImpl & function)
{
/** Algorithm is to create a loop that iterate over ColumnDataRowsSize size_t argument and
* over ColumnData data and null_data. On each step compiled expression from function
* will be executed over column data and null_data row.
*/
ProfileEvents::increment(ProfileEvents::CompileFunction);
const auto & arg_types = function.getArgumentTypes();
llvm::IRBuilder<> b(module.getContext());
auto * size_type = b.getIntNTy(sizeof(size_t) * 8);
auto * data_type = llvm::StructType::get(b.getInt8PtrTy(), b.getInt8PtrTy());
auto * func_type = llvm::FunctionType::get(b.getVoidTy(), { size_type, data_type->getPointerTo() }, /*isVarArg=*/false);
/// Create function in module
auto * func = llvm::Function::Create(func_type, llvm::Function::ExternalLinkage, function.getName(), module);
auto * args = func->args().begin();
llvm::Value * counter_arg = &*args++;
llvm::Value * columns_arg = &*args++;
/// Initialize ColumnDataPlaceholder llvm representation of ColumnData
auto * entry = llvm::BasicBlock::Create(b.getContext(), "entry", func);
b.SetInsertPoint(entry);
std::vector<ColumnDataPlaceholder> columns(arg_types.size() + 1);
for (size_t i = 0; i <= arg_types.size(); ++i)
{
const auto & type = i == arg_types.size() ? function.getResultType() : arg_types[i];
auto * data = b.CreateLoad(b.CreateConstInBoundsGEP1_32(data_type, columns_arg, i));
columns[i].data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(type))->getPointerTo());
columns[i].null_init = type->isNullable() ? b.CreateExtractValue(data, {1}) : nullptr;
}
/// Initialize loop
auto * loop = llvm::BasicBlock::Create(b.getContext(), "loop", func);
b.CreateBr(loop);
b.SetInsertPoint(loop);
auto * counter_phi = b.CreatePHI(counter_arg->getType(), 2);
counter_phi->addIncoming(counter_arg, entry);
for (auto & col : columns)
{
col.data = b.CreatePHI(col.data_init->getType(), 2);
col.data->addIncoming(col.data_init, entry);
if (col.null_init)
{
col.null = b.CreatePHI(col.null_init->getType(), 2);
col.null->addIncoming(col.null_init, entry);
}
}
/// Initialize column row values
Values arguments;
arguments.reserve(arg_types.size());
for (size_t i = 0; i < arg_types.size(); ++i)
{
auto & column = columns[i];
auto type = arg_types[i];
auto * value = b.CreateLoad(column.data);
if (!column.null)
{
arguments.emplace_back(value);
continue;
}
auto * is_null = b.CreateICmpNE(b.CreateLoad(column.null), b.getInt8(0));
auto * nullable_unitilized = llvm::Constant::getNullValue(toNativeType(b, type));
auto * nullable_value = b.CreateInsertValue(b.CreateInsertValue(nullable_unitilized, value, {0}), is_null, {1});
arguments.emplace_back(nullable_value);
}
/// Compile values for column rows and store compiled value in result column
auto * result = function.compile(b, std::move(arguments));
if (columns.back().null)
{
b.CreateStore(b.CreateExtractValue(result, {0}), columns.back().data);
b.CreateStore(b.CreateSelect(b.CreateExtractValue(result, {1}), b.getInt8(1), b.getInt8(0)), columns.back().null);
}
else
{
b.CreateStore(result, columns.back().data);
}
/// End of loop
auto * cur_block = b.GetInsertBlock();
for (auto & col : columns)
{
col.data->addIncoming(b.CreateConstInBoundsGEP1_32(nullptr, col.data, 1), cur_block);
if (col.null)
col.null->addIncoming(b.CreateConstInBoundsGEP1_32(nullptr, col.null, 1), cur_block);
}
counter_phi->addIncoming(b.CreateSub(counter_phi, llvm::ConstantInt::get(size_type, 1)), cur_block);
auto * end = llvm::BasicBlock::Create(b.getContext(), "end", func);
b.CreateCondBr(b.CreateICmpNE(counter_phi, llvm::ConstantInt::get(size_type, 1)), loop, end);
b.SetInsertPoint(end);
b.CreateRetVoid();
}
CHJIT::CompiledModuleInfo compileFunction(CHJIT & jit, const IFunctionBaseImpl & function)
{
Stopwatch watch;
auto compiled_module_info = jit.compileModule([&](llvm::Module & module)
{
compileFunction(module, function);
});
ProfileEvents::increment(ProfileEvents::CompileExpressionsMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::CompileExpressionsBytes, compiled_module_info.size);
ProfileEvents::increment(ProfileEvents::CompileFunction);
return compiled_module_info;
}
}
#endif

View File

@ -0,0 +1,46 @@
#pragma once
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_EMBEDDED_COMPILER
#include <Functions/IFunctionImpl.h>
#include <Interpreters/JIT/CHJIT.h>
namespace DB
{
/** ColumnData structure to pass into compiled function.
* data is raw column data.
* null_data is null map column raw data.
*/
struct ColumnData
{
const char * data = nullptr;
const char * null_data = nullptr;
};
/** Returns ColumnData for column.
* If constant column is passed, LOGICAL_ERROR will be thrown.
*/
ColumnData getColumnData(const IColumn * column);
using ColumnDataRowsSize = size_t;
using JITCompiledFunction = void (*)(ColumnDataRowsSize, ColumnData *);
/** Compile function to native jit code using CHJIT instance.
* Function is compiled as single module.
* After this function execution, code for function will be compiled and can be queried using
* findCompiledFunction with function name.
* Compiled function can be safely casted to JITCompiledFunction type and must be called with
* valid ColumnData and ColumnDataRowsSize.
* It is important that ColumnData parameter of JITCompiledFunction is result column,
* and will be filled by compiled function.
*/
CHJIT::CompiledModuleInfo compileFunction(CHJIT & jit, const IFunctionBaseImpl & function);
}
#endif

View File

@ -35,3 +35,6 @@ target_link_libraries (string_hash_set PRIVATE dbms)
add_executable (two_level_hash_map two_level_hash_map.cpp)
target_include_directories (two_level_hash_map SYSTEM BEFORE PRIVATE ${SPARSEHASH_INCLUDE_DIR})
target_link_libraries (two_level_hash_map PRIVATE dbms)
add_executable (jit_example jit_example.cpp)
target_link_libraries (jit_example PRIVATE dbms)

View File

@ -0,0 +1,57 @@
#include <iostream>
#include <llvm/IR/IRBuilder.h>
#include <Interpreters/JIT/CHJIT.h>
void test_function()
{
std::cerr << "Test function" << std::endl;
}
int main(int argc, char **argv)
{
(void)(argc);
(void)(argv);
auto jit = DB::CHJIT();
jit.registerExternalSymbol("test_function", reinterpret_cast<void *>(&test_function));
auto compiled_module_info = jit.compileModule([](llvm::Module & module)
{
auto & context = module.getContext();
llvm::IRBuilder<> b (context);
auto * func_declaration_type = llvm::FunctionType::get(b.getVoidTy(), { }, /*isVarArg=*/false);
auto * func_declaration = llvm::Function::Create(func_declaration_type, llvm::Function::ExternalLinkage, "test_function", module);
auto * value_type = b.getInt64Ty();
auto * pointer_type = value_type->getPointerTo();
auto * func_type = llvm::FunctionType::get(b.getVoidTy(), { pointer_type }, /*isVarArg=*/false);
auto * function = llvm::Function::Create(func_type, llvm::Function::ExternalLinkage, "test_name", module);
auto * entry = llvm::BasicBlock::Create(context, "entry", function);
auto * argument = function->args().begin();
b.SetInsertPoint(entry);
b.CreateCall(func_declaration);
auto * load_argument = b.CreateLoad(argument);
auto * value = b.CreateAdd(load_argument, load_argument);
b.CreateRet(value);
});
for (const auto & compiled_function_name : compiled_module_info.compiled_functions)
{
std::cerr << compiled_function_name << std::endl;
}
int64_t value = 5;
auto * test_name_function = reinterpret_cast<int64_t (*)(int64_t *)>(jit.findCompiledFunction(compiled_module_info, "test_name"));
auto result = test_name_function(&value);
std::cerr << "Result " << result << std::endl;
return 0;
}

View File

@ -445,6 +445,7 @@ void StorageS3::updateClientAndAuthSettings(ContextPtr ctx, StorageS3::ClientAut
}
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
settings.region,
ctx->getRemoteHostFilter(), ctx->getGlobalContext()->getSettingsRef().s3_max_redirects);
client_configuration.endpointOverride = upd.uri.endpoint;

View File

@ -30,6 +30,7 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
auto endpoint = config.getString(config_elem + "." + key + ".endpoint");
auto access_key_id = config.getString(config_elem + "." + key + ".access_key_id", "");
auto secret_access_key = config.getString(config_elem + "." + key + ".secret_access_key", "");
auto region = config.getString(config_elem + "." + key + ".region", "");
auto server_side_encryption_customer_key_base64 = config.getString(config_elem + "." + key + ".server_side_encryption_customer_key_base64", "");
std::optional<bool> use_environment_credentials;
if (config.has(config_elem + "." + key + ".use_environment_credentials"))
@ -57,7 +58,14 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
}
}
settings.emplace(endpoint, S3AuthSettings{std::move(access_key_id), std::move(secret_access_key), std::move(server_side_encryption_customer_key_base64), std::move(headers), use_environment_credentials, use_insecure_imds_request});
settings.emplace(endpoint, S3AuthSettings{
std::move(access_key_id), std::move(secret_access_key),
std::move(region),
std::move(server_side_encryption_customer_key_base64),
std::move(headers),
use_environment_credentials,
use_insecure_imds_request
});
}
}
}

View File

@ -28,6 +28,7 @@ struct S3AuthSettings
{
String access_key_id;
String secret_access_key;
String region;
String server_side_encryption_customer_key_base64;
HeaderCollection headers;
@ -38,7 +39,9 @@ struct S3AuthSettings
inline bool operator==(const S3AuthSettings & other) const
{
return access_key_id == other.access_key_id && secret_access_key == other.secret_access_key
&& server_side_encryption_customer_key_base64 == other.server_side_encryption_customer_key_base64 && headers == other.headers
&& region == other.region
&& server_side_encryption_customer_key_base64 == other.server_side_encryption_customer_key_base64
&& headers == other.headers
&& use_environment_credentials == other.use_environment_credentials
&& use_insecure_imds_request == other.use_insecure_imds_request;
}

View File

@ -198,6 +198,7 @@ def test_inserts_single_replica_local_internal_replication(started_cluster):
def test_inserts_single_replica_internal_replication(started_cluster):
try:
node1.query(
"INSERT INTO distributed_one_replica_internal_replication VALUES ('2000-01-01', 1)",
settings={
@ -208,9 +209,12 @@ def test_inserts_single_replica_internal_replication(started_cluster):
},
)
assert node2.query("SELECT count(*) FROM single_replicated").strip() == '1'
finally:
node2.query("TRUNCATE TABLE single_replicated")
def test_inserts_single_replica_no_internal_replication(started_cluster):
try:
with pytest.raises(QueryRuntimeException, match="Table default.single_replicated doesn't exist"):
node1.query(
"INSERT INTO distributed_one_replica_no_internal_replication VALUES ('2000-01-01', 1)",
@ -220,6 +224,8 @@ def test_inserts_single_replica_no_internal_replication(started_cluster):
},
)
assert node2.query("SELECT count(*) FROM single_replicated").strip() == '1'
finally:
node2.query("TRUNCATE TABLE single_replicated")
def test_prefer_localhost_replica(started_cluster):

View File

@ -0,0 +1,33 @@
import http.server
import sys
class RequestHandler(http.server.BaseHTTPRequestHandler):
def do_HEAD(self):
if self.path.startswith("/get-my-path/"):
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
elif self.path == "/":
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
else:
self.send_response(404)
self.send_header("Content-Type", "text/plain")
self.end_headers()
def do_GET(self):
self.do_HEAD()
if self.path.startswith("/get-my-path/"):
self.wfile.write(b'/' + self.path.split('/', maxsplit=2)[2].encode())
elif self.path == "/":
self.wfile.write(b"OK")
httpd = http.server.HTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
httpd.serve_forever()

View File

@ -146,6 +146,47 @@ def test_put(cluster, maybe_auth, positive, compression):
assert values_csv == get_s3_file_content(cluster, bucket, filename)
@pytest.mark.parametrize("special", [
"space",
"plus"
])
def test_get_file_with_special(cluster, special):
symbol = {"space": " ", "plus": "+"}[special]
urlsafe_symbol = {"space": "%20", "plus": "%2B"}[special]
auth = "'minio','minio123',"
bucket = cluster.minio_restricted_bucket
instance = cluster.instances["dummy"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = [[12549, 2463, 19893], [64021, 38652, 66703], [81611, 39650, 83516], [11079, 59507, 61546], [51764, 69952, 6876], [41165, 90293, 29095], [40167, 78432, 48309], [81629, 81327, 11855], [55852, 21643, 98507], [6738, 54643, 41155]]
values_csv = ('\n'.join((','.join(map(str, row)) for row in values)) + '\n').encode()
filename = f"get_file_with_{special}_{symbol}two.csv"
put_s3_file_content(cluster, bucket, filename, values_csv)
get_query = f"SELECT * FROM s3('http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/get_file_with_{special}_{urlsafe_symbol}two.csv', {auth}'CSV', '{table_format}') FORMAT TSV"
assert [list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()] == values
get_query = f"SELECT * FROM s3('http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/get_file_with_{special}*.csv', {auth}'CSV', '{table_format}') FORMAT TSV"
assert [list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()] == values
get_query = f"SELECT * FROM s3('http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/get_file_with_{special}_{urlsafe_symbol}*.csv', {auth}'CSV', '{table_format}') FORMAT TSV"
assert [list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()] == values
@pytest.mark.parametrize("special", [
"space",
"plus",
"plus2"
])
def test_get_path_with_special(cluster, special):
symbol = {"space": "%20", "plus": "%2B", "plus2": "%2B"}[special]
safe_symbol = {"space": "%20", "plus": "+", "plus2": "%2B"}[special]
auth = "'minio','minio123',"
table_format = "column1 String"
instance = cluster.instances["dummy"]
get_query = f"SELECT * FROM s3('http://resolver:8082/get-my-path/{safe_symbol}.csv', {auth}'CSV', '{table_format}') FORMAT TSV"
assert run_query(instance, get_query).splitlines() == [f"/{symbol}.csv"]
# Test put no data to S3.
@pytest.mark.parametrize("auth", [
"'minio','minio123',"
@ -389,6 +430,7 @@ def run_s3_mocks(cluster):
mocks = (
("mock_s3.py", "resolver", "8080"),
("unstable_server.py", "resolver", "8081"),
("echo.py", "resolver", "8082"),
)
for mock_filename, container, port in mocks:
container_id = cluster.get_container_id(container)

View File

@ -0,0 +1,2 @@
0
1

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
query_id="aggregating_merge_tree_simple_aggregate_function_string_query100_profile100_$CLICKHOUSE_DATABASE"
${CLICKHOUSE_CLIENT} --query="select sleep(1)" --query_id="$query_id" --query_profiler_real_time_period_ns=10000000
${CLICKHOUSE_CLIENT} --query="system flush logs"
${CLICKHOUSE_CLIENT} --query="select count(*) > 1 from system.trace_log where query_id = '$query_id'"

View File

@ -0,0 +1,7 @@
test_jit_nonnull
0 0 0
1 2 1
test_jit_nullable
0 0 0
1 2 1
\N 0 0

View File

@ -0,0 +1,16 @@
SET compile_expressions = 1;
SET min_count_to_compile_expression = 0;
DROP TABLE IF EXISTS test_jit_nonnull;
CREATE TABLE test_jit_nonnull (value UInt8) ENGINE = TinyLog;
INSERT INTO test_jit_nonnull VALUES (0), (1);
SELECT 'test_jit_nonnull';
SELECT value, multiIf(value = 1, 2, value, 1, 0), if (value, 1, 0) FROM test_jit_nonnull;
DROP TABLE IF EXISTS test_jit_nullable;
CREATE TABLE test_jit_nullable (value Nullable(UInt8)) ENGINE = TinyLog;
INSERT INTO test_jit_nullable VALUES (0), (1), (NULL);
SELECT 'test_jit_nullable';
SELECT value, multiIf(value = 1, 2, value, 1, 0), if (value, 1, 0) FROM test_jit_nullable;

View File

@ -0,0 +1,168 @@
#!/usr/bin/env python3
from http.server import SimpleHTTPRequestHandler,HTTPServer
import socket
import csv
import sys
import tempfile
import threading
import os
import gzip
import traceback
import urllib.request
import subprocess
import lzma
def get_local_port(host):
with socket.socket() as fd:
fd.bind((host, 0))
return fd.getsockname()[1]
CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1')
CLICKHOUSE_PORT_HTTP = os.environ.get('CLICKHOUSE_PORT_HTTP', '8123')
#####################################################################################
# This test starts an HTTP server and serves data to clickhouse url-engine based table.
# The main goal of this test is checking that compress methods are working.
# In order for it to work ip+port of http server (given below) should be
# accessible from clickhouse server.
#####################################################################################
# IP-address of this host accessible from outside world.
HTTP_SERVER_HOST = subprocess.check_output(['hostname', '-i']).decode('utf-8').strip()
HTTP_SERVER_PORT = get_local_port(HTTP_SERVER_HOST)
# IP address and port of the HTTP server started from this script.
HTTP_SERVER_ADDRESS = (HTTP_SERVER_HOST, HTTP_SERVER_PORT)
HTTP_SERVER_URL_STR = 'http://' + ':'.join(str(s) for s in HTTP_SERVER_ADDRESS) + "/"
# Because we need to check content of file.csv we can create this content and avoid reading csv
CSV_DATA = "Hello, 1\nWorld, 2\nThis, 152\nis, 9283\ntesting, 2313213\ndata, 555\n"
# Choose compression method
# (Will change during test, need to check standart data sending, to make sure that nothing broke)
COMPRESS_METHOD = 'none'
ADDING_ENDING = ''
ENDINGS = ['.gz', '.xz']
SEND_ENCODING = True
def get_ch_answer(query):
url = os.environ.get('CLICKHOUSE_URL', 'http://{host}:{port}'.format(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT_HTTP))
return urllib.request.urlopen(url, data=query.encode()).read().decode()
def check_answers(query, answer):
ch_answer = get_ch_answer(query)
if ch_answer.strip() != answer.strip():
print("FAIL on query:", query, file=sys.stderr)
print("Expected answer:", answer, file=sys.stderr)
print("Fetched answer :", ch_answer, file=sys.stderr)
raise Exception("Fail on query")
# Server with head method which is useful for debuging by hands
class HttpProcessor(SimpleHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
if SEND_ENCODING:
self.send_header('Content-Encoding', COMPRESS_METHOD)
if COMPRESS_METHOD == 'none':
self.send_header('Content-Length', len(CSV_DATA.encode()))
else:
self.compress_data()
self.send_header('Content-Length', len(self.data))
self.send_header('Content-Type', 'text/csv')
self.end_headers()
def do_HEAD(self):
self._set_headers()
return
def compress_data(self):
if COMPRESS_METHOD == 'gzip':
self.data = gzip.compress((CSV_DATA).encode())
elif COMPRESS_METHOD == 'lzma':
self.data = lzma.compress((CSV_DATA).encode())
else:
self.data = 'WRONG CONVERSATION'.encode()
def do_GET(self):
self._set_headers()
if COMPRESS_METHOD == 'none':
self.wfile.write(CSV_DATA.encode())
else:
self.wfile.write(self.data)
return
def log_message(self, format, *args):
return
def start_server(requests_amount):
httpd = HTTPServer(HTTP_SERVER_ADDRESS, HttpProcessor)
def real_func():
for i in range(requests_amount):
httpd.handle_request()
t = threading.Thread(target=real_func)
return t
#####################################################################
# Testing area.
#####################################################################
def test_select(dict_name="", schema="word String, counter UInt32", requests=[], answers=[], test_data=""):
global ADDING_ENDING
global SEND_ENCODING
global COMPRESS_METHOD
for i in range(len(requests)):
if i > 2:
ADDING_ENDING = ENDINGS[i-3]
SEND_ENCODING = False
if dict_name:
get_ch_answer("drop dictionary if exists {}".format(dict_name))
get_ch_answer('''CREATE DICTIONARY {} ({})
PRIMARY KEY word
SOURCE(HTTP(url '{}' format 'CSV'))
LAYOUT(complex_key_hashed())
LIFETIME(0)'''.format(dict_name, schema, HTTP_SERVER_URL_STR+'/test.csv' + ADDING_ENDING))
COMPRESS_METHOD = requests[i]
print(i, COMPRESS_METHOD, ADDING_ENDING, SEND_ENCODING)
check_answers("select * from {}".format(dict_name), answers[i])
def main():
# first three for encoding, second three for url
insert_requests = [
'none',
'gzip',
'lzma',
'gzip',
'lzma'
]
# This answers got experemently in non compressed mode and they are correct
answers = ['''This 152\nHello 1\nis 9283\ndata 555\nWorld 2\ntesting 2313213'''] * 5
t = start_server(len(insert_requests))
t.start()
test_select(dict_name="test_table_select", requests=insert_requests, answers=answers)
t.join()
print("PASSED")
if __name__ == "__main__":
try:
main()
except Exception as ex:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback, file=sys.stderr)
print(ex, file=sys.stderr)
sys.stderr.flush()
os._exit(1)

View File

@ -0,0 +1,6 @@
0 none True
1 gzip True
2 lzma True
3 gzip .gz False
4 lzma .xz False
PASSED

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
python3 "$CURDIR"/01854_HTTP_dict_decompression.python

View File

@ -0,0 +1,6 @@
1
1
1
1
1
1

View File

@ -0,0 +1,15 @@
SET compile_expressions = 1;
SET min_count_to_compile_expression = 0;
DROP TABLE IF EXISTS test_table;
CREATE TABLE test_table (a UInt64) ENGINE = MergeTree() ORDER BY tuple();
INSERT INTO test_table VALUES (1);
SELECT test_table.a FROM test_table ORDER BY (test_table.a > test_table.a) + 1;
SELECT test_table.a FROM test_table ORDER BY (test_table.a >= test_table.a) + 1;
SELECT test_table.a FROM test_table ORDER BY (test_table.a < test_table.a) + 1;
SELECT test_table.a FROM test_table ORDER BY (test_table.a <= test_table.a) + 1;
SELECT test_table.a FROM test_table ORDER BY (test_table.a == test_table.a) + 1;
SELECT test_table.a FROM test_table ORDER BY (test_table.a != test_table.a) + 1;

View File

@ -0,0 +1,12 @@
1
1
1
1
1
1
1
1
1
1
10020

View File

@ -0,0 +1,20 @@
SELECT uniqCombined(number)
FROM numbers(10000)
GROUP BY number
WITH TOTALS
ORDER BY number DESC
LIMIT 10
SETTINGS
/* force aggregates serialization to trigger the issue with */
max_bytes_before_external_group_by=1,
/* overflow row: */
max_rows_to_group_by=10000000000,
group_by_overflow_mode='any',
totals_mode='before_having',
/* this is to account memory under 4MB (for max_bytes_before_external_group_by) to use less rows */
max_untracked_memory=0,
group_by_two_level_threshold=10000,
/* explicitly */
max_block_size=1000,
max_threads=1
;

View File

@ -2,6 +2,7 @@ v21.4.6.55-stable 2021-04-30
v21.4.5.46-stable 2021-04-24
v21.4.4.30-stable 2021-04-16
v21.4.3.21-stable 2021-04-12
v21.3.10.1-lts 2021-05-09
v21.3.9.83-lts 2021-04-28
v21.3.8.76-lts 2021-04-24
v21.3.7.62-stable 2021-04-16

1 v21.4.6.55-stable 2021-04-30
2 v21.4.5.46-stable 2021-04-24
3 v21.4.4.30-stable 2021-04-16
4 v21.4.3.21-stable 2021-04-12
5 v21.3.10.1-lts 2021-05-09
6 v21.3.9.83-lts 2021-04-28
7 v21.3.8.76-lts 2021-04-24
8 v21.3.7.62-stable 2021-04-16