Merge branch 'master' into add_symlinks_for_lld

This commit is contained in:
Alexey Milovidov 2020-02-16 14:01:47 +03:00
commit bec14b196f
378 changed files with 8098 additions and 2745 deletions

View File

@ -52,12 +52,12 @@ IncludeCategories:
ReflowComments: false
AlignEscapedNewlinesLeft: false
AlignEscapedNewlines: DontAlign
AlignTrailingComments: true
# Not changed:
AccessModifierOffset: -4
AlignConsecutiveAssignments: false
AlignOperands: false
AlignTrailingComments: false
AllowAllParametersOfDeclarationOnNextLine: true
AllowShortBlocksOnASingleLine: false
AllowShortCaseLabelsOnASingleLine: false

1
.gitignore vendored
View File

@ -15,6 +15,7 @@
/docs/build
/docs/publish
/docs/edit
/docs/website
/docs/tools/venv/
/docs/en/single.md
/docs/ru/single.md

4
.gitmodules vendored
View File

@ -104,6 +104,10 @@
[submodule "contrib/sparsehash-c11"]
path = contrib/sparsehash-c11
url = https://github.com/sparsehash/sparsehash-c11.git
[submodule "contrib/grpc"]
path = contrib/grpc
url = https://github.com/grpc/grpc.git
branch = v1.25.0
[submodule "contrib/aws"]
path = contrib/aws
url = https://github.com/aws/aws-sdk-cpp.git

View File

@ -341,6 +341,7 @@ endif()
include (cmake/find/libxml2.cmake)
include (cmake/find/brotli.cmake)
include (cmake/find/protobuf.cmake)
include (cmake/find/grpc.cmake)
include (cmake/find/pdqsort.cmake)
include (cmake/find/hdfs3.cmake) # uses protobuf
include (cmake/find/s3.cmake)

View File

@ -1,18 +1,17 @@
[![ClickHouse — open source distributed column-oriented DBMS](https://github.com/ClickHouse/ClickHouse/raw/master/website/images/logo-400x240.png)](https://clickhouse.yandex)
[![ClickHouse — open source distributed column-oriented DBMS](https://github.com/ClickHouse/ClickHouse/raw/master/website/images/logo-400x240.png)](https://clickhouse.tech)
ClickHouse is an open-source column-oriented database management system that allows generating analytical data reports in real time.
## Useful Links
* [Official website](https://clickhouse.yandex/) has quick high-level overview of ClickHouse on main page.
* [Tutorial](https://clickhouse.yandex/tutorial.html) shows how to set up and query small ClickHouse cluster.
* [Documentation](https://clickhouse.yandex/docs/en/) provides more in-depth information.
* [Official website](https://clickhouse.tech/) has quick high-level overview of ClickHouse on main page.
* [Tutorial](https://clickhouse.tech/docs/en/getting_started/tutorial/) shows how to set up and query small ClickHouse cluster.
* [Documentation](https://clickhouse.tech/docs/en/) provides more in-depth information.
* [YouTube channel](https://www.youtube.com/c/ClickHouseDB) has a lot of content about ClickHouse in video format.
* [Blog](https://clickhouse.yandex/blog/en/) contains various ClickHouse-related articles, as well as announces and reports about events.
* [Contacts](https://clickhouse.yandex/#contacts) can help to get your questions answered if there are any.
* [Contacts](https://clickhouse.tech/#contacts) can help to get your questions answered if there are any.
* You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person.
## Upcoming Events
## Upcoming Events
* [ClickHouse Meetup in San Francisco](https://www.eventbrite.com/e/clickhouse-february-meetup-registration-88496227599) on February 5.
* [ClickHouse Meetup in New York](https://www.meetup.com/Uber-Engineering-Events-New-York/events/268328663/) on February 11.
* [ClickHouse Meetup in Athens](https://www.meetup.com/Athens-Big-Data/events/268379195/) on March 5.

57
cmake/find/grpc.cmake Normal file
View File

@ -0,0 +1,57 @@
set(_PROTOBUF_PROTOC $<TARGET_FILE:protoc>)
set(_GRPC_CPP_PLUGIN_EXECUTABLE $<TARGET_FILE:grpc_cpp_plugin>)
function(PROTOBUF_GENERATE_GRPC_CPP SRCS HDRS)
if(NOT ARGN)
message(SEND_ERROR "Error: PROTOBUF_GENERATE_GRPC_CPP() called without any proto files")
return()
endif()
if(PROTOBUF_GENERATE_CPP_APPEND_PATH)
foreach(FIL ${ARGN})
get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
get_filename_component(ABS_PATH ${ABS_FIL} PATH)
list(FIND _protobuf_include_path ${ABS_PATH} _contains_already)
if(${_contains_already} EQUAL -1)
list(APPEND _protobuf_include_path -I ${ABS_PATH})
endif()
endforeach()
else()
set(_protobuf_include_path -I ${CMAKE_CURRENT_SOURCE_DIR})
endif()
if(DEFINED PROTOBUF_IMPORT_DIRS)
foreach(DIR ${Protobuf_IMPORT_DIRS})
get_filename_component(ABS_PATH ${DIR} ABSOLUTE)
list(FIND _protobuf_include_path ${ABS_PATH} _contains_already)
if(${_contains_already} EQUAL -1)
list(APPEND _protobuf_include_path -I ${ABS_PATH})
endif()
endforeach()
endif()
set(${SRCS})
set(${HDRS})
foreach(FIL ${ARGN})
get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
get_filename_component(FIL_WE ${FIL} NAME_WE)
list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.grpc.pb.cc")
list(APPEND ${HDRS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.grpc.pb.h")
add_custom_command(
OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.grpc.pb.cc"
"${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.grpc.pb.h"
COMMAND ${_PROTOBUF_PROTOC}
ARGS --grpc_out=${CMAKE_CURRENT_BINARY_DIR}
--plugin=protoc-gen-grpc=${_GRPC_CPP_PLUGIN_EXECUTABLE}
${_protobuf_include_path} ${ABS_FIL}
DEPENDS ${ABS_FIL}
COMMENT "Running gRPC C++ protocol buffer compiler on ${FIL}"
VERBATIM)
endforeach()
set_source_files_properties(${${SRCS}} ${${HDRS}} PROPERTIES GENERATED TRUE)
set(${SRCS} ${${SRCS}} PARENT_SCOPE)
set(${HDRS} ${${HDRS}} PARENT_SCOPE)
endfunction()

View File

@ -1,4 +1,4 @@
if (NOT APPLE AND NOT ARCH_32)
if (NOT ARCH_32)
option (USE_INTERNAL_LIBGSASL_LIBRARY "Set to FALSE to use system libgsasl library instead of bundled" ${NOT_UNBUNDLED})
endif ()
@ -16,7 +16,7 @@ if (NOT USE_INTERNAL_LIBGSASL_LIBRARY)
endif ()
if (LIBGSASL_LIBRARY AND LIBGSASL_INCLUDE_DIR)
elseif (NOT MISSING_INTERNAL_LIBGSASL_LIBRARY AND NOT APPLE AND NOT ARCH_32)
elseif (NOT MISSING_INTERNAL_LIBGSASL_LIBRARY AND NOT ARCH_32)
set (LIBGSASL_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libgsasl/src ${ClickHouse_SOURCE_DIR}/contrib/libgsasl/linux_x86_64/include)
set (USE_INTERNAL_LIBGSASL_LIBRARY 1)
set (LIBGSASL_LIBRARY libgsasl)

View File

@ -1,5 +1,5 @@
# Freebsd: contrib/cppkafka/include/cppkafka/detail/endianness.h:53:23: error: 'betoh16' was not declared in this scope
if (NOT ARCH_ARM AND NOT ARCH_32 AND NOT APPLE AND NOT OS_FREEBSD AND OPENSSL_FOUND)
if (NOT ARCH_ARM AND NOT ARCH_32 AND NOT OS_FREEBSD AND OPENSSL_FOUND)
option (ENABLE_RDKAFKA "Enable kafka" ${ENABLE_LIBRARIES})
endif ()
@ -10,7 +10,7 @@ endif ()
if (ENABLE_RDKAFKA)
if (OS_LINUX AND NOT ARCH_ARM AND USE_LIBGSASL)
if (NOT ARCH_ARM AND USE_LIBGSASL)
option (USE_INTERNAL_RDKAFKA_LIBRARY "Set to FALSE to use system librdkafka instead of the bundled" ${NOT_UNBUNDLED})
endif ()

View File

@ -336,4 +336,6 @@ if (USE_FASTOPS)
add_subdirectory (fastops-cmake)
endif()
add_subdirectory(grpc-cmake)
add_subdirectory(replxx-cmake)

2
contrib/avro vendored

@ -1 +1 @@
Subproject commit 5b2752041c8d2f75eb5c1dbec8b4c25fc0e24d12
Subproject commit 6cfcf6c24293af100d523b89b61d1ab216fa4735

View File

@ -146,3 +146,5 @@ target_compile_definitions(curl PRIVATE HAVE_CONFIG_H BUILDING_LIBCURL CURL_HIDD
target_include_directories(curl PUBLIC ${CURL_DIR}/include ${CURL_DIR}/lib .)
target_compile_definitions(curl PRIVATE OS="${CMAKE_SYSTEM_NAME}")
target_link_libraries(curl PRIVATE ssl)

View File

@ -1,3 +1,4 @@
#define CURL_CA_BUNDLE "/etc/ssl/certs/ca-certificates.crt"
#define CURL_DISABLE_FTP
#define CURL_DISABLE_TFTP
#define CURL_DISABLE_LDAP
@ -9,9 +10,14 @@
#define SIZEOF_CURL_OFF_T 8
#define SIZEOF_SIZE_T 8
#define HAVE_ALARM
#define HAVE_FCNTL_O_NONBLOCK
#define HAVE_GETADDRINFO
#define HAVE_LONGLONG
#define HAVE_POLL_FINE
#define HAVE_SIGACTION
#define HAVE_SIGNAL
#define HAVE_SIGSETJMP
#define HAVE_SOCKET
#define HAVE_STRUCT_TIMEVAL
@ -34,5 +40,11 @@
#define HAVE_ERRNO_H
#define HAVE_FCNTL_H
#define HAVE_NETDB_H
#define HAVE_NETINET_IN_H
#define HAVE_SETJMP_H
#define HAVE_SYS_STAT_H
#define HAVE_UNISTD_H
#define ENABLE_IPV6
#define USE_OPENSSL
#define USE_THREADS_POSIX

1
contrib/grpc vendored Submodule

@ -0,0 +1 @@
Subproject commit c1d176528fd8da9dd4066d16554bcd216d29033f

File diff suppressed because it is too large Load Diff

2
contrib/libgsasl vendored

@ -1 +1 @@
Subproject commit 3b8948a4042e34fb00b4fb987535dc9e02e39040
Subproject commit 42ef20687042637252e64df1934b6d47771486d1

2
contrib/librdkafka vendored

@ -1 +1 @@
Subproject commit 6160ec275a5bb0a4088ede3c5f2afde638bbef65
Subproject commit 4ffe54b4f59ee5ae3767f9f25dc14651a3384d62

View File

@ -23,6 +23,8 @@ set(SRCS
${RDKAFKA_SOURCE_DIR}/rdkafka_lz4.c
${RDKAFKA_SOURCE_DIR}/rdkafka_metadata.c
${RDKAFKA_SOURCE_DIR}/rdkafka_metadata_cache.c
${RDKAFKA_SOURCE_DIR}/rdkafka_mock.c
${RDKAFKA_SOURCE_DIR}/rdkafka_mock_handlers.c
${RDKAFKA_SOURCE_DIR}/rdkafka_msg.c
${RDKAFKA_SOURCE_DIR}/rdkafka_msgset_reader.c
${RDKAFKA_SOURCE_DIR}/rdkafka_msgset_writer.c

View File

@ -75,8 +75,18 @@
#define HAVE_STRNDUP 1
// strerror_r
#define HAVE_STRERROR_R 1
#ifdef __APPLE__
// pthread_setname_np
#define HAVE_PTHREAD_SETNAME_DARWIN 1
#if (__ENVIRONMENT_MAC_OS_X_VERSION_MIN_REQUIRED__ <= 101400)
#define _TTHREAD_EMULATE_TIMESPEC_GET_
#endif
#else
// pthread_setname_gnu
#define HAVE_PTHREAD_SETNAME_GNU 1
#endif
// python
//#define HAVE_PYTHON 1
// disable C11 threads for compatibility with old libc

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit d478f62bd93c9cd14eb343756ef73a4ae622ddf5
Subproject commit d805cf5ca4cf8bdc642261cfcbe7a0a241cb7298

View File

@ -575,6 +575,7 @@ void HTTPHandler::processQuery(
try
{
char b;
//FIXME looks like MSG_DONTWAIT is useless because of POCO_BROKEN_TIMEOUTS
int status = socket.receiveBytes(&b, 1, MSG_DONTWAIT | MSG_PEEK);
if (status == 0)
context.killCurrentQuery();

View File

@ -2,6 +2,7 @@
#include <Common/config.h>
#include <Poco/Net/TCPServerConnection.h>
#include <Common/getFQDNOrHostName.h>
#include <Common/CurrentMetrics.h>
#include <Core/MySQLProtocol.h>
#include "IServer.h"
@ -9,6 +10,11 @@
#include <Poco/Net/SecureStreamSocket.h>
#endif
namespace CurrentMetrics
{
extern const Metric MySQLConnection;
}
namespace DB
{
/// Handler for MySQL wire protocol connections. Allows to connect to ClickHouse using MySQL client.
@ -20,6 +26,8 @@ public:
void run() final;
private:
CurrentMetrics::Increment metric_increment{CurrentMetrics::MySQLConnection};
/// Enables SSL, if client requested.
void finishHandshake(MySQLProtocol::HandshakeResponse &);

View File

@ -76,6 +76,7 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request
}
else
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
response.send() << message.rdbuf();
}
}

View File

@ -578,6 +578,25 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->initializeTraceCollector();
#endif
/// Describe multiple reasons when query profiler cannot work.
#if !USE_UNWIND
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they cannot work without bundled unwind (stack unwinding) library.");
#endif
#if WITH_COVERAGE
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they work extremely slow with test coverage.");
#endif
#if defined(SANITIZER)
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they cannot work under sanitizers"
" when two different stack unwinding methods will interfere with each other.");
#endif
if (!hasPHDRCache())
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they require PHDR cache to be created"
" (otherwise the function 'dl_iterate_phdr' is not lock free and not async-signal safe).");
global_context->setCurrentDatabase(default_database);
if (has_zookeeper && config().has("distributed_ddl"))

View File

@ -900,6 +900,10 @@ void TCPHandler::receiveQuery()
client_info.initial_query_id = client_info.current_query_id;
client_info.initial_address = client_info.current_address;
}
else
{
query_context->switchRowPolicy();
}
}
/// Per query settings.

View File

@ -185,7 +185,7 @@
<mlock_executable>false</mlock_executable>
<!-- Configuration of clusters that could be used in Distributed tables.
https://clickhouse.yandex/docs/en/table_engines/distributed/
https://clickhouse.tech/docs/en/operations/table_engines/distributed/
-->
<remote_servers incl="clickhouse_remote_servers" >
<!-- Test only shard config for testing distributed storage -->

View File

@ -74,7 +74,8 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
current_resolved_address = DNSResolver::instance().resolveAddress(host, port);
socket->connect(*current_resolved_address, timeouts.connection_timeout);
const auto & connection_timeout = static_cast<bool>(secure) ? timeouts.secure_connection_timeout : timeouts.connection_timeout;
socket->connect(*current_resolved_address, connection_timeout);
socket->setReceiveTimeout(timeouts.receive_timeout);
socket->setSendTimeout(timeouts.send_timeout);
socket->setNoDelay(true);

View File

@ -440,6 +440,8 @@ XMLDocumentPtr ConfigProcessor::processConfig(
zkutil::ZooKeeperNodeCache * zk_node_cache,
const zkutil::EventPtr & zk_changed_event)
{
LOG_DEBUG(log, "Processing configuration file '" + path + "'.");
XMLDocumentPtr config = dom_parser.parse(path);
std::vector<std::string> contributing_files;
@ -449,6 +451,8 @@ XMLDocumentPtr ConfigProcessor::processConfig(
{
try
{
LOG_DEBUG(log, "Merging configuration file '" + merge_file + "'.");
XMLDocumentPtr with = dom_parser.parse(merge_file);
merge(config, with);
contributing_files.push_back(merge_file);
@ -484,6 +488,8 @@ XMLDocumentPtr ConfigProcessor::processConfig(
}
if (!include_from_path.empty())
{
LOG_DEBUG(log, "Including configuration file '" + include_from_path + "'.");
contributing_files.push_back(include_from_path);
include_from = dom_parser.parse(include_from_path);
}
@ -613,6 +619,7 @@ void ConfigProcessor::savePreprocessedConfig(const LoadedConfig & loaded_config,
Poco::File(preprocessed_path_parent).createDirectories();
}
DOMWriter().writeNode(preprocessed_path, loaded_config.preprocessed_xml);
LOG_DEBUG(log, "Saved preprocessed configuration to '" << preprocessed_path << "'.");
}
catch (Poco::Exception & e)
{

View File

@ -15,7 +15,8 @@
M(DiskSpaceReservedForMerge, "Disk space reserved for currently running background merges. It is slightly more than the total size of currently merging parts.") \
M(DistributedSend, "Number of connections to remote servers sending data that was INSERTed into Distributed tables. Both synchronous and asynchronous mode.") \
M(QueryPreempted, "Number of queries that are stopped and waiting due to 'priority' setting.") \
M(TCPConnection, "Number of connections to TCP server (clients with native interface)") \
M(TCPConnection, "Number of connections to TCP server (clients with native interface), also included server-server distributed query connections") \
M(MySQLConnection, "Number of client connections using MySQL protocol") \
M(HTTPConnection, "Number of connections to HTTP server") \
M(InterserverConnection, "Number of connections from other replicas to fetch parts") \
M(OpenFileForRead, "Number of files open for reading") \

View File

@ -95,6 +95,7 @@ void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_
* with_stacktrace - prints stack trace for DB::Exception.
* check_embedded_stacktrace - if DB::Exception has embedded stacktrace then
* only this stack trace will be printed.
* with_extra_info - add information about the filesystem in case of "No space left on device" and similar.
*/
std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded_stacktrace = false,
bool with_extra_info = true);

View File

@ -1,12 +1,17 @@
#include <cstdlib>
#include "MemoryTracker.h"
#include <common/likely.h>
#include <common/logger_useful.h>
#include <IO/WriteHelpers.h>
#include "Common/TraceCollector.h"
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <Common/CurrentThread.h>
#include <IO/WriteHelpers.h>
#include <common/likely.h>
#include <common/logger_useful.h>
#include <ext/singleton.h>
#include <atomic>
#include <cmath>
#include <cstdlib>
namespace DB
@ -73,7 +78,7 @@ void MemoryTracker::alloc(Int64 size)
return;
/** Using memory_order_relaxed means that if allocations are done simultaneously,
* we allow exception about memory limit exceeded to be thrown only on next allocation.
* we allow exception about memory limit exceeded to be thrown only on next allocation.
* So, we allow over-allocations.
*/
Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);
@ -81,7 +86,8 @@ void MemoryTracker::alloc(Int64 size)
if (metric != CurrentMetrics::end())
CurrentMetrics::add(metric, size);
Int64 current_limit = limit.load(std::memory_order_relaxed);
Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed);
Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed);
/// Using non-thread-safe random number generator. Joint distribution in different threads would not be uniform.
/// In this case, it doesn't matter.
@ -98,12 +104,19 @@ void MemoryTracker::alloc(Int64 size)
message << " " << description;
message << ": fault injected. Would use " << formatReadableSizeWithBinarySuffix(will_be)
<< " (attempt to allocate chunk of " << size << " bytes)"
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_limit);
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}
if (unlikely(current_limit && will_be > current_limit))
if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
{
auto no_track = blocker.cancel();
ext::Singleton<DB::TraceCollector>()->collect(size);
setOrRaiseProfilerLimit(current_profiler_limit + Int64(std::ceil((will_be - current_profiler_limit) / profiler_step)) * profiler_step);
}
if (unlikely(current_hard_limit && will_be > current_hard_limit))
{
free(size);
@ -116,7 +129,7 @@ void MemoryTracker::alloc(Int64 size)
message << " " << description;
message << " exceeded: would use " << formatReadableSizeWithBinarySuffix(will_be)
<< " (attempt to allocate chunk of " << size << " bytes)"
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_limit);
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}
@ -174,7 +187,8 @@ void MemoryTracker::resetCounters()
{
amount.store(0, std::memory_order_relaxed);
peak.store(0, std::memory_order_relaxed);
limit.store(0, std::memory_order_relaxed);
hard_limit.store(0, std::memory_order_relaxed);
profiler_limit.store(0, std::memory_order_relaxed);
}
@ -187,11 +201,20 @@ void MemoryTracker::reset()
}
void MemoryTracker::setOrRaiseLimit(Int64 value)
void MemoryTracker::setOrRaiseHardLimit(Int64 value)
{
/// This is just atomic set to maximum.
Int64 old_value = limit.load(std::memory_order_relaxed);
while (old_value < value && !limit.compare_exchange_weak(old_value, value))
Int64 old_value = hard_limit.load(std::memory_order_relaxed);
while (old_value < value && !hard_limit.compare_exchange_weak(old_value, value))
;
}
void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
{
/// This is just atomic set to maximum.
Int64 old_value = profiler_limit.load(std::memory_order_relaxed);
while (old_value < value && !profiler_limit.compare_exchange_weak(old_value, value))
;
}
@ -207,7 +230,7 @@ namespace CurrentMemoryTracker
if (untracked > untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be usefull for enlarge Exception message in rethrow logic.
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = untracked;
untracked = 0;
memory_tracker->alloc(tmp);
@ -218,10 +241,7 @@ namespace CurrentMemoryTracker
void realloc(Int64 old_size, Int64 new_size)
{
Int64 addition = new_size - old_size;
if (addition > 0)
alloc(addition);
else
free(-addition);
addition > 0 ? alloc(addition) : free(-addition);
}
void free(Int64 size)

View File

@ -15,7 +15,10 @@ class MemoryTracker
{
std::atomic<Int64> amount {0};
std::atomic<Int64> peak {0};
std::atomic<Int64> limit {0};
std::atomic<Int64> hard_limit {0};
std::atomic<Int64> profiler_limit {0};
Int64 profiler_step = 0;
/// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability.
double fault_probability = 0;
@ -32,7 +35,6 @@ class MemoryTracker
public:
MemoryTracker(VariableContext level_ = VariableContext::Thread) : level(level_) {}
MemoryTracker(Int64 limit_, VariableContext level_ = VariableContext::Thread) : limit(limit_), level(level_) {}
MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread) : parent(parent_), level(level_) {}
~MemoryTracker();
@ -66,21 +68,22 @@ public:
return peak.load(std::memory_order_relaxed);
}
void setLimit(Int64 limit_)
{
limit.store(limit_, std::memory_order_relaxed);
}
/** Set limit if it was not set.
* Otherwise, set limit to new value, if new value is greater than previous limit.
*/
void setOrRaiseLimit(Int64 value);
void setOrRaiseHardLimit(Int64 value);
void setOrRaiseProfilerLimit(Int64 value);
void setFaultProbability(double value)
{
fault_probability = value;
}
void setProfilerStep(Int64 value)
{
profiler_step = value;
}
/// next should be changed only once: from nullptr to some value.
/// NOTE: It is not true in MergeListElement
void setParent(MemoryTracker * elem)

View File

@ -1,92 +1,38 @@
#include "QueryProfiler.h"
#include <random>
#include <common/phdr_cache.h>
#include <common/config_common.h>
#include <common/StringRef.h>
#include <common/logger_useful.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/thread_local_rng.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptorDiscardOnFailure.h>
#include <Common/Exception.h>
#include <Common/StackTrace.h>
#include <Common/TraceCollector.h>
#include <Common/thread_local_rng.h>
#include <common/StringRef.h>
#include <common/config_common.h>
#include <common/logger_useful.h>
#include <common/phdr_cache.h>
#include <ext/singleton.h>
#include <random>
namespace ProfileEvents
{
extern const Event QueryProfilerSignalOverruns;
}
namespace DB
{
extern LazyPipeFDs trace_pipe;
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr size_t QUERY_ID_MAX_LEN = 1024;
#if defined(OS_LINUX)
thread_local size_t write_trace_iteration = 0;
#endif
void writeTraceInfo(TimerType timer_type, int /* sig */, siginfo_t * info, void * context)
void writeTraceInfo(TraceType trace_type, int /* sig */, siginfo_t * info, void * context)
{
int overrun_count = 0;
#if defined(OS_LINUX)
/// Quickly drop if signal handler is called too frequently.
/// Otherwise we may end up infinitelly processing signals instead of doing any useful work.
++write_trace_iteration;
if (info && info->si_overrun > 0)
{
/// But pass with some frequency to avoid drop of all traces.
if (write_trace_iteration % info->si_overrun == 0)
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, info->si_overrun);
}
else
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, info->si_overrun + 1);
return;
}
}
if (info)
overrun_count = info->si_overrun;
#else
UNUSED(info);
#endif
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(UInt8) + // number of stack frames
sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity
sizeof(TimerType) + // timer type
sizeof(UInt64); // thread_id
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(trace_pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
UInt64 thread_id = CurrentThread::get().thread_id;
const auto signal_context = *reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(signal_context);
writeChar(false, out);
writeStringBinary(query_id, out);
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFrames()[i], out);
writePODBinary(timer_type, out);
writePODBinary(thread_id, out);
out.next();
ext::Singleton<TraceCollector>()->collect(trace_type, stack_trace, overrun_count);
}
[[maybe_unused]] const UInt32 TIMER_PRECISION = 1e9;
@ -135,11 +81,11 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const UInt64 thread_id, const
sev.sigev_notify = SIGEV_THREAD_ID;
sev.sigev_signo = pause_signal;
#if defined(__FreeBSD__)
# if defined(__FreeBSD__)
sev._sigev_un._threadid = thread_id;
#else
# else
sev._sigev_un._tid = thread_id;
#endif
# endif
if (timer_create(clock_type, &sev, &timer_id))
{
/// In Google Cloud Run, the function "timer_create" is implemented incorrectly as of 2020-01-25.
@ -206,7 +152,7 @@ QueryProfilerReal::QueryProfilerReal(const UInt64 thread_id, const UInt32 period
void QueryProfilerReal::signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TimerType::Real, sig, info, context);
writeTraceInfo(TraceType::REAL_TIME, sig, info, context);
}
QueryProfilerCpu::QueryProfilerCpu(const UInt64 thread_id, const UInt32 period)
@ -215,7 +161,7 @@ QueryProfilerCpu::QueryProfilerCpu(const UInt64 thread_id, const UInt32 period)
void QueryProfilerCpu::signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TimerType::Cpu, sig, info, context);
writeTraceInfo(TraceType::CPU_TIME, sig, info, context);
}
}

View File

@ -15,12 +15,6 @@ namespace Poco
namespace DB
{
enum class TimerType : UInt8
{
Real,
Cpu,
};
/**
* Query profiler implementation for selected thread.
*

View File

@ -57,7 +57,7 @@ ShellCommand::~ShellCommand()
std::unique_ptr<ShellCommand> ShellCommand::executeImpl(const char * filename, char * const argv[], bool pipe_stdin_only, bool terminate_in_destructor)
{
/** Here it is written that with a normal call `vfork`, there is a chance of deadlock in multithreaded programs,
* because of the resolving of characters in the shared library
* because of the resolving of symbols in the shared library
* http://www.oracle.com/technetwork/server-storage/solaris10/subprocess-136439.html
* Therefore, separate the resolving of the symbol from the call.
*/

View File

@ -1,25 +1,38 @@
#include "TraceCollector.h"
#include <Core/Field.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptorDiscardOnFailure.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/TraceLog.h>
#include <Poco/Logger.h>
#include <Common/Exception.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <common/logger_useful.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <Common/Exception.h>
#include <Interpreters/TraceLog.h>
#include <unistd.h>
#include <fcntl.h>
namespace ProfileEvents
{
extern const Event QueryProfilerSignalOverruns;
}
namespace DB
{
LazyPipeFDs trace_pipe;
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr size_t QUERY_ID_MAX_LEN = 1024;
thread_local size_t write_trace_iteration = 0;
}
namespace ErrorCodes
{
@ -27,20 +40,15 @@ namespace ErrorCodes
extern const int THREAD_IS_NOT_JOINABLE;
}
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_)
: log(&Poco::Logger::get("TraceCollector"))
, trace_log(trace_log_)
TraceCollector::TraceCollector()
{
if (trace_log == nullptr)
throw Exception("Invalid trace log pointer passed", ErrorCodes::NULL_POINTER_DEREFERENCE);
trace_pipe.open();
pipe.open();
/** Turn write end of pipe to non-blocking mode to avoid deadlocks
* when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe.
*/
trace_pipe.setNonBlocking();
trace_pipe.tryIncreaseSize(1 << 20);
pipe.setNonBlocking();
pipe.tryIncreaseSize(1 << 20);
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}
@ -48,14 +56,101 @@ TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_)
TraceCollector::~TraceCollector()
{
if (!thread.joinable())
LOG_ERROR(log, "TraceCollector thread is malformed and cannot be joined");
LOG_ERROR(&Poco::Logger::get("TraceCollector"), "TraceCollector thread is malformed and cannot be joined");
else
{
TraceCollector::notifyToStop();
stop();
thread.join();
}
trace_pipe.close();
pipe.close();
}
void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trace, int overrun_count)
{
/// Quickly drop if signal handler is called too frequently.
/// Otherwise we may end up infinitelly processing signals instead of doing any useful work.
++write_trace_iteration;
if (overrun_count)
{
/// But pass with some frequency to avoid drop of all traces.
if (write_trace_iteration % overrun_count == 0)
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count);
}
else
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count + 1);
return;
}
}
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(UInt8) + // number of stack frames
sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity
sizeof(TraceType) + // trace type
sizeof(UInt64) + // thread_id
sizeof(UInt64); // size
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
auto thread_id = CurrentThread::get().thread_id;
writeChar(false, out);
writeStringBinary(query_id, out);
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFrames()[i], out);
writePODBinary(trace_type, out);
writePODBinary(thread_id, out);
writePODBinary(UInt64(0), out);
out.next();
}
void TraceCollector::collect(UInt64 size)
{
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(UInt8) + // number of stack frames
sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity
sizeof(TraceType) + // trace type
sizeof(UInt64) + // thread_id
sizeof(UInt64); // size
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
auto thread_id = CurrentThread::get().thread_id;
writeChar(false, out);
writeStringBinary(query_id, out);
const auto & stack_trace = StackTrace();
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFrames()[i], out);
writePODBinary(TraceType::MEMORY, out);
writePODBinary(thread_id, out);
writePODBinary(size, out);
out.next();
}
/**
@ -68,16 +163,16 @@ TraceCollector::~TraceCollector()
* NOTE: TraceCollector will NOT stop immediately as there may be some data left in the pipe
* before stop message.
*/
void TraceCollector::notifyToStop()
void TraceCollector::stop()
{
WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1]);
WriteBufferFromFileDescriptor out(pipe.fds_rw[1]);
writeChar(true, out);
out.next();
}
void TraceCollector::run()
{
ReadBufferFromFileDescriptor in(trace_pipe.fds_rw[0]);
ReadBufferFromFileDescriptor in(pipe.fds_rw[0]);
while (true)
{
@ -89,27 +184,33 @@ void TraceCollector::run()
std::string query_id;
readStringBinary(query_id, in);
UInt8 size = 0;
readIntBinary(size, in);
UInt8 trace_size = 0;
readIntBinary(trace_size, in);
Array trace;
trace.reserve(size);
trace.reserve(trace_size);
for (size_t i = 0; i < size; i++)
for (size_t i = 0; i < trace_size; i++)
{
uintptr_t addr = 0;
readPODBinary(addr, in);
trace.emplace_back(UInt64(addr));
}
TimerType timer_type;
readPODBinary(timer_type, in);
TraceType trace_type;
readPODBinary(trace_type, in);
UInt64 thread_id;
readPODBinary(thread_id, in);
TraceLogElement element{std::time(nullptr), timer_type, thread_id, query_id, trace};
trace_log->add(element);
UInt64 size;
readPODBinary(size, in);
if (trace_log)
{
TraceLogElement element{std::time(nullptr), trace_type, thread_id, query_id, trace, size};
trace_log->add(element);
}
}
}

View File

@ -1,7 +1,10 @@
#pragma once
#include "Common/PipeFDs.h"
#include <Common/ThreadPool.h>
class StackTrace;
namespace Poco
{
class Logger;
@ -12,21 +15,31 @@ namespace DB
class TraceLog;
enum class TraceType : UInt8
{
REAL_TIME,
CPU_TIME,
MEMORY,
};
class TraceCollector
{
public:
TraceCollector();
~TraceCollector();
void setTraceLog(const std::shared_ptr<TraceLog> & trace_log_) { trace_log = trace_log_; }
void collect(TraceType type, const StackTrace & stack_trace, int overrun_count = 0);
void collect(UInt64 size);
private:
Poco::Logger * log;
std::shared_ptr<TraceLog> trace_log;
ThreadFromGlobalPool thread;
LazyPipeFDs pipe;
void run();
static void notifyToStop();
public:
TraceCollector(std::shared_ptr<TraceLog> & trace_log_);
~TraceCollector();
void stop();
};
}

View File

@ -1,14 +1,16 @@
#if defined(OS_LINUX)
#include <malloc.h>
#elif defined(OS_DARWIN)
#include <malloc/malloc.h>
#endif
#include <new>
#include <common/config_common.h>
#include <common/memory.h>
#include <Common/MemoryTracker.h>
#include <iostream>
#include <new>
#if defined(OS_LINUX)
# include <malloc.h>
#elif defined(OS_DARWIN)
# include <malloc/malloc.h>
#endif
/// Replace default new/delete with memory tracking versions.
/// @sa https://en.cppreference.com/w/cpp/memory/new/operator_new
/// https://en.cppreference.com/w/cpp/memory/new/operator_delete
@ -29,7 +31,7 @@ ALWAYS_INLINE void trackMemory(std::size_t size)
#endif
}
ALWAYS_INLINE bool trackMemoryNoExept(std::size_t size) noexcept
ALWAYS_INLINE bool trackMemoryNoExcept(std::size_t size) noexcept
{
try
{
@ -54,11 +56,11 @@ ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [
#else
if (size)
CurrentMemoryTracker::free(size);
#ifdef _GNU_SOURCE
# ifdef _GNU_SOURCE
/// It's innaccurate resource free for sanitizers. malloc_usable_size() result is greater or equal to allocated size.
else
CurrentMemoryTracker::free(malloc_usable_size(ptr));
#endif
# endif
#endif
}
catch (...)
@ -83,14 +85,14 @@ void * operator new[](std::size_t size)
void * operator new(std::size_t size, const std::nothrow_t &) noexcept
{
if (likely(Memory::trackMemoryNoExept(size)))
if (likely(Memory::trackMemoryNoExcept(size)))
return Memory::newNoExept(size);
return nullptr;
}
void * operator new[](std::size_t size, const std::nothrow_t &) noexcept
{
if (likely(Memory::trackMemoryNoExept(size)))
if (likely(Memory::trackMemoryNoExcept(size)))
return Memory::newNoExept(size);
return nullptr;
}

View File

@ -0,0 +1,17 @@
#pragma once
#include <Interpreters/Context.h>
inline DB::Context createContext()
{
auto context = DB::Context::createGlobal();
context.makeGlobalContext();
context.setPath("./");
return context;
}
inline const DB::Context & getContext()
{
static DB::Context global_context = createContext();
return global_context;
}

View File

@ -6,6 +6,7 @@
#define DBMS_DEFAULT_HTTP_PORT 8123
#define DBMS_DEFAULT_CONNECT_TIMEOUT_SEC 10
#define DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_MS 50
#define DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_SECURE_MS 100
#define DBMS_DEFAULT_SEND_TIMEOUT_SEC 300
#define DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC 300
/// Timeout for synchronous request-result protocol call (like Ping or TablesStatus).

View File

@ -62,6 +62,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, interactive_delay, 100000, "The interval in microseconds to check if the request is cancelled, and to send progress info.", 0) \
M(SettingSeconds, connect_timeout, DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, "Connection timeout if there are no replicas.", 0) \
M(SettingMilliseconds, connect_timeout_with_failover_ms, DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_MS, "Connection timeout for selecting first healthy replica.", 0) \
M(SettingMilliseconds, connect_timeout_with_failover_secure_ms, DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_SECURE_MS, "Connection timeout for selecting first healthy replica (for secure connections).", 0) \
M(SettingSeconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "", 0) \
M(SettingSeconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "", 0) \
M(SettingSeconds, tcp_keep_alive_timeout, 0, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \
@ -331,6 +332,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, max_memory_usage, 0, "Maximum memory usage for processing of single query. Zero means unlimited.", 0) \
M(SettingUInt64, max_memory_usage_for_user, 0, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited.", 0) \
M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Maximum memory usage for processing all concurrently running queries on the server. Zero means unlimited.", 0) \
M(SettingUInt64, memory_profiler_step, 0, "Every number of bytes the memory profiler will dump the allocating stacktrace. Zero means disabled memory profiler.", 0) \
\
M(SettingUInt64, max_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for a query. Zero means unlimited.", 0) \
M(SettingUInt64, max_network_bytes, 0, "The maximum number of bytes (compressed) to receive or transmit over the network for execution of the query.", 0) \
@ -382,7 +384,6 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingBool, allow_experimental_live_view, false, "Enable LIVE VIEW. Not mature enough.", 0) \
M(SettingSeconds, live_view_heartbeat_interval, DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC, "The heartbeat interval in seconds to indicate live query is alive.", 0) \
M(SettingSeconds, temporary_live_view_timeout, DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC, "Timeout after which temporary live view is deleted.", 0) \
M(SettingUInt64, max_live_view_insert_blocks_before_refresh, 64, "Limit maximum number of inserted blocks after which mergeable blocks are dropped and query is re-executed.", 0) \
M(SettingUInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.", 0) \
\
@ -406,6 +407,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, merge_tree_uniform_read_distribution, true, "Obsolete setting, does nothing. Will be removed after 2020-05-20", 0) \
M(SettingUInt64, mark_cache_min_lifetime, 0, "Obsolete setting, does nothing. Will be removed after 2020-05-31", 0) \
M(SettingUInt64, max_parser_depth, 1000, "Maximum parser depth.", 0) \
M(SettingSeconds, temporary_live_view_timeout, DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC, "Timeout after which temporary live view is deleted.", 0) \
DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS)

View File

@ -1,6 +1,7 @@
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/CheckConstraintsBlockOutputStream.h>
#include <Parsers/formatAST.h>
#include <Interpreters/ExpressionActions.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnsNumber.h>
#include <Common/assert_cast.h>

View File

@ -34,7 +34,7 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction()
unit.is_last = !have_more_data;
unit.status = READY_TO_PARSE;
scheduleParserThreadForUnitWithNumber(current_unit_number);
scheduleParserThreadForUnitWithNumber(segmentator_ticket_number);
++segmentator_ticket_number;
if (!have_more_data)
@ -49,12 +49,13 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction()
}
}
void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_number)
void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_ticket_number)
{
try
{
setThreadName("ChunkParser");
const auto current_unit_number = current_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
/*
@ -64,9 +65,9 @@ void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_n
* can use it from multiple threads simultaneously.
*/
ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);
auto parser = std::make_unique<InputStreamFromInputFormat>(
input_processor_creator(read_buffer, header,
row_input_format_params, format_settings));
auto format = input_processor_creator(read_buffer, header, row_input_format_params, format_settings);
format->setCurrentUnitNumber(current_ticket_number);
auto parser = std::make_unique<InputStreamFromInputFormat>(std::move(format));
unit.block_ext.block.clear();
unit.block_ext.block_missing_values.clear();

View File

@ -213,9 +213,9 @@ private:
std::deque<ProcessingUnit> processing_units;
void scheduleParserThreadForUnitWithNumber(size_t unit_number)
void scheduleParserThreadForUnitWithNumber(size_t ticket_number)
{
pool.scheduleOrThrowOnError(std::bind(&ParallelParsingBlockInputStream::parserThreadFunction, this, unit_number));
pool.scheduleOrThrowOnError(std::bind(&ParallelParsingBlockInputStream::parserThreadFunction, this, ticket_number));
}
void finishAndWait()

View File

@ -8,7 +8,6 @@
#include <Parsers/ASTInsertQuery.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/ThreadPool.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Storages/StorageValues.h>
@ -51,8 +50,10 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
ASTPtr query;
BlockOutputStreamPtr out;
if (auto * materialized_view = dynamic_cast<const StorageMaterializedView *>(dependent_table.get()))
if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get()))
{
addTableLock(materialized_view->lockStructureForShare(true, context.getInitialQueryId()));
StoragePtr inner_table = materialized_view->getTargetTable();
auto inner_table_id = inner_table->getStorageID();
query = materialized_view->getInnerQuery();

View File

@ -252,20 +252,23 @@ void DatabaseOrdinary::alterTable(
ast->replace(ast_create_query.select, metadata.select);
}
ASTStorage & storage_ast = *ast_create_query.storage;
/// ORDER BY may change, but cannot appear, it's required construction
if (metadata.order_by_ast && storage_ast.order_by)
storage_ast.set(storage_ast.order_by, metadata.order_by_ast);
/// MaterializedView is one type of CREATE query without storage.
if (ast_create_query.storage)
{
ASTStorage & storage_ast = *ast_create_query.storage;
/// ORDER BY may change, but cannot appear, it's required construction
if (metadata.order_by_ast && storage_ast.order_by)
storage_ast.set(storage_ast.order_by, metadata.order_by_ast);
if (metadata.primary_key_ast)
storage_ast.set(storage_ast.primary_key, metadata.primary_key_ast);
if (metadata.primary_key_ast)
storage_ast.set(storage_ast.primary_key, metadata.primary_key_ast);
if (metadata.ttl_for_table_ast)
storage_ast.set(storage_ast.ttl_table, metadata.ttl_for_table_ast);
if (metadata.settings_ast)
storage_ast.set(storage_ast.settings, metadata.settings_ast);
if (metadata.ttl_for_table_ast)
storage_ast.set(storage_ast.ttl_table, metadata.ttl_for_table_ast);
if (metadata.settings_ast)
storage_ast.set(storage_ast.settings, metadata.settings_ast);
}
statement = getObjectDefinitionFromCreateQuery(ast);
{

View File

@ -74,6 +74,11 @@ AttributeUnderlyingType getAttributeUnderlyingType(const std::string & type)
return AttributeUnderlyingType::utDecimal128;
}
// Temporary hack to allow arrays in keys, since they are never retrieved for polygon dictionaries.
// TODO: This should be fixed by fully supporting arrays in dictionaries.
if (type.find("Array") == 0)
return AttributeUnderlyingType::utString;
throw Exception{"Unknown type " + type, ErrorCodes::UNKNOWN_TYPE};
}

View File

@ -0,0 +1,748 @@
#include <ext/map.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include "PolygonDictionary.h"
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
#include <numeric>
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
}
IPolygonDictionary::IPolygonDictionary(
const std::string & database_,
const std::string & name_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
InputType input_type_,
PointType point_type_)
: database(database_)
, name(name_)
, full_name{database_.empty() ? name_ : (database_ + "." + name_)}
, dict_struct(dict_struct_)
, source_ptr(std::move(source_ptr_))
, dict_lifetime(dict_lifetime_)
, input_type(input_type_)
, point_type(point_type_)
{
createAttributes();
loadData();
}
const std::string & IPolygonDictionary::getDatabase() const
{
return database;
}
const std::string & IPolygonDictionary::getName() const
{
return name;
}
const std::string & IPolygonDictionary::getFullName() const
{
return full_name;
}
std::string IPolygonDictionary::getTypeName() const
{
return "Polygon";
}
std::string IPolygonDictionary::getKeyDescription() const
{
return dict_struct.getKeyDescription();
}
size_t IPolygonDictionary::getBytesAllocated() const
{
return bytes_allocated;
}
size_t IPolygonDictionary::getQueryCount() const
{
return query_count.load(std::memory_order_relaxed);
}
double IPolygonDictionary::getHitRate() const
{
return 1.0;
}
size_t IPolygonDictionary::getElementCount() const
{
return element_count;
}
double IPolygonDictionary::getLoadFactor() const
{
return 1.0;
}
const IDictionarySource * IPolygonDictionary::getSource() const
{
return source_ptr.get();
}
const DictionaryLifetime & IPolygonDictionary::getLifetime() const
{
return dict_lifetime;
}
const DictionaryStructure & IPolygonDictionary::getStructure() const
{
return dict_struct;
}
bool IPolygonDictionary::isInjective(const std::string &) const
{
return false;
}
BlockInputStreamPtr IPolygonDictionary::getBlockInputStream(const Names &, size_t) const
{
// TODO: In order for this to work one would first have to support retrieving arrays from dictionaries.
// I believe this is a separate task done by some other people.
throw Exception{"Reading the dictionary is not allowed", ErrorCodes::UNSUPPORTED_METHOD};
}
template <typename T>
void IPolygonDictionary::appendNullValueImpl(const Field & null_value)
{
null_values.emplace_back(T(null_value.get<NearestFieldType<T>>()));
}
void IPolygonDictionary::appendNullValue(AttributeUnderlyingType type, const Field & null_value)
{
switch (type)
{
case AttributeUnderlyingType::utUInt8:
appendNullValueImpl<UInt8>(null_value);
break;
case AttributeUnderlyingType::utUInt16:
appendNullValueImpl<UInt16>(null_value);
break;
case AttributeUnderlyingType::utUInt32:
appendNullValueImpl<UInt32>(null_value);
break;
case AttributeUnderlyingType::utUInt64:
appendNullValueImpl<UInt64>(null_value);
break;
case AttributeUnderlyingType::utUInt128:
appendNullValueImpl<UInt128>(null_value);
break;
case AttributeUnderlyingType::utInt8:
appendNullValueImpl<Int8>(null_value);
break;
case AttributeUnderlyingType::utInt16:
appendNullValueImpl<Int16>(null_value);
break;
case AttributeUnderlyingType::utInt32:
appendNullValueImpl<Int32>(null_value);
break;
case AttributeUnderlyingType::utInt64:
appendNullValueImpl<Int64>(null_value);
break;
case AttributeUnderlyingType::utFloat32:
appendNullValueImpl<Float32>(null_value);
break;
case AttributeUnderlyingType::utFloat64:
appendNullValueImpl<Float64>(null_value);
break;
case AttributeUnderlyingType::utDecimal32:
appendNullValueImpl<Decimal32>(null_value);
break;
case AttributeUnderlyingType::utDecimal64:
appendNullValueImpl<Decimal64>(null_value);
break;
case AttributeUnderlyingType::utDecimal128:
appendNullValueImpl<Decimal128>(null_value);
break;
case AttributeUnderlyingType::utString:
appendNullValueImpl<String>(null_value);
break;
}
}
void IPolygonDictionary::createAttributes()
{
attributes.resize(dict_struct.attributes.size());
for (size_t i = 0; i < dict_struct.attributes.size(); ++i)
{
const auto & attr = dict_struct.attributes[i];
attribute_index_by_name.emplace(attr.name, i);
appendNullValue(attr.underlying_type, attr.null_value);
if (attr.hierarchical)
throw Exception{name + ": hierarchical attributes not supported for dictionary of type " + getTypeName(),
ErrorCodes::TYPE_MISMATCH};
}
}
void IPolygonDictionary::blockToAttributes(const DB::Block &block)
{
const auto rows = block.rows();
element_count += rows;
for (size_t i = 0; i < attributes.size(); ++i)
{
const auto & column = block.safeGetByPosition(i + 1);
if (attributes[i])
{
MutableColumnPtr mutated = std::move(*attributes[i]).mutate();
mutated->insertRangeFrom(*column.column, 0, column.column->size());
attributes[i] = std::move(mutated);
}
else
attributes[i] = column.column;
}
/** Multi-polygons could cause bigger sizes, but this is better than nothing. */
polygons.reserve(polygons.size() + rows);
ids.reserve(ids.size() + rows);
const auto & key = block.safeGetByPosition(0).column;
extractPolygons(key);
}
void IPolygonDictionary::loadData()
{
auto stream = source_ptr->loadAll();
stream->readPrefix();
while (const auto block = stream->read())
blockToAttributes(block);
stream->readSuffix();
for (auto & polygon : polygons)
bg::correct(polygon);
}
void IPolygonDictionary::calculateBytesAllocated()
{
// TODO:: Account for key.
for (const auto & column : attributes)
bytes_allocated += column->allocatedBytes();
}
std::vector<IPolygonDictionary::Point> IPolygonDictionary::extractPoints(const Columns &key_columns)
{
if (key_columns.size() != 2)
throw Exception{"Expected two columns of coordinates", ErrorCodes::BAD_ARGUMENTS};
const auto column_x = typeid_cast<const ColumnVector<Float64>*>(key_columns[0].get());
const auto column_y = typeid_cast<const ColumnVector<Float64>*>(key_columns[1].get());
if (!column_x || !column_y)
throw Exception{"Expected columns of Float64", ErrorCodes::TYPE_MISMATCH};
const auto rows = key_columns.front()->size();
std::vector<Point> result;
result.reserve(rows);
for (const auto row : ext::range(0, rows))
result.emplace_back(column_x->getElement(row), column_y->getElement(row));
return result;
}
void IPolygonDictionary::has(const Columns &key_columns, const DataTypes &, PaddedPODArray<UInt8> &out) const
{
size_t row = 0;
for (const auto & pt : extractPoints(key_columns))
{
size_t trash = 0;
out[row] = find(pt, trash);
++row;
}
query_count.fetch_add(row, std::memory_order_relaxed);
}
size_t IPolygonDictionary::getAttributeIndex(const std::string & attribute_name) const
{
const auto it = attribute_index_by_name.find(attribute_name);
if (it == attribute_index_by_name.end())
throw Exception{"No such attribute: " + attribute_name, ErrorCodes::BAD_ARGUMENTS};
return it->second;
}
#define DECLARE(TYPE) \
void IPolygonDictionary::get##TYPE( \
const std::string & attribute_name, const Columns & key_columns, const DataTypes &, ResultArrayType<TYPE> & out) const \
{ \
const auto ind = getAttributeIndex(attribute_name); \
checkAttributeType(name, attribute_name, dict_struct.attributes[ind].underlying_type, AttributeUnderlyingType::ut##TYPE); \
\
const auto null_value = std::get<TYPE>(null_values[ind]); \
\
getItemsImpl<TYPE, TYPE>( \
ind, \
key_columns, \
[&](const size_t row, const auto value) { out[row] = value; }, \
[&](const size_t) { return null_value; }); \
}
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void IPolygonDictionary::getString(
const std::string & attribute_name, const Columns & key_columns, const DataTypes &, ColumnString * out) const
{
const auto ind = getAttributeIndex(attribute_name);
checkAttributeType(name, attribute_name, dict_struct.attributes[ind].underlying_type, AttributeUnderlyingType::utString);
const auto & null_value = StringRef{std::get<String>(null_values[ind])};
getItemsImpl<String, StringRef>(
ind,
key_columns,
[&](const size_t, const StringRef & value) { out->insertData(value.data, value.size); },
[&](const size_t) { return null_value; });
}
#define DECLARE(TYPE) \
void IPolygonDictionary::get##TYPE( \
const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes &, \
const PaddedPODArray<TYPE> & def, \
ResultArrayType<TYPE> & out) const \
{ \
const auto ind = getAttributeIndex(attribute_name); \
checkAttributeType(name, attribute_name, dict_struct.attributes[ind].underlying_type, AttributeUnderlyingType::ut##TYPE); \
\
getItemsImpl<TYPE, TYPE>( \
ind, \
key_columns, \
[&](const size_t row, const auto value) { out[row] = value; }, \
[&](const size_t row) { return def[row]; }); \
}
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void IPolygonDictionary::getString(
const std::string & attribute_name,
const Columns & key_columns,
const DataTypes &,
const ColumnString * const def,
ColumnString * const out) const
{
const auto ind = getAttributeIndex(attribute_name);
checkAttributeType(name, attribute_name, dict_struct.attributes[ind].underlying_type, AttributeUnderlyingType::utString);
getItemsImpl<String, StringRef>(
ind,
key_columns,
[&](const size_t, const StringRef value) { out->insertData(value.data, value.size); },
[&](const size_t row) { return def->getDataAt(row); });
}
#define DECLARE(TYPE) \
void IPolygonDictionary::get##TYPE( \
const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes &, \
const TYPE def, \
ResultArrayType<TYPE> & out) const \
{ \
const auto ind = getAttributeIndex(attribute_name); \
checkAttributeType(name, attribute_name, dict_struct.attributes[ind].underlying_type, AttributeUnderlyingType::ut##TYPE); \
\
getItemsImpl<TYPE, TYPE>( \
ind, key_columns, [&](const size_t row, const auto value) { out[row] = value; }, [&](const size_t) { return def; }); \
}
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void IPolygonDictionary::getString(
const std::string & attribute_name,
const Columns & key_columns,
const DataTypes &,
const String & def,
ColumnString * const out) const
{
const auto ind = getAttributeIndex(attribute_name);
checkAttributeType(name, attribute_name, dict_struct.attributes[ind].underlying_type, AttributeUnderlyingType::utString);
getItemsImpl<String, StringRef>(
ind,
key_columns,
[&](const size_t, const StringRef value) { out->insertData(value.data, value.size); },
[&](const size_t) { return StringRef{def}; });
}
template <typename AttributeType, typename OutputType, typename ValueSetter, typename DefaultGetter>
void IPolygonDictionary::getItemsImpl(
size_t attribute_ind, const Columns & key_columns, ValueSetter && set_value, DefaultGetter && get_default) const
{
const auto points = extractPoints(key_columns);
using ColVecType = std::conditional_t<IsDecimalNumber<AttributeType>, ColumnDecimal<AttributeType>, ColumnVector<AttributeType>>;
using ColType = std::conditional_t<std::is_same<AttributeType, String>::value, ColumnString, ColVecType>;
const auto column = typeid_cast<const ColType *>(attributes[attribute_ind].get());
if (!column)
throw Exception{"An attribute should be a column of its type", ErrorCodes::BAD_ARGUMENTS};
for (const auto i : ext::range(0, points.size()))
{
size_t id = 0;
const auto found = find(points[i], id);
id = ids[id];
if (!found)
{
set_value(i, static_cast<OutputType>(get_default(i)));
continue;
}
if constexpr (std::is_same<AttributeType, String>::value)
set_value(i, static_cast<OutputType>(column->getDataAt(id)));
else
set_value(i, static_cast<OutputType>(column->getElement(id)));
}
query_count.fetch_add(points.size(), std::memory_order_relaxed);
}
namespace
{
struct Offset
{
Offset() = default;
IColumn::Offsets ring_offsets;
IColumn::Offsets polygon_offsets;
IColumn::Offsets multi_polygon_offsets;
IColumn::Offset points_added = 0;
IColumn::Offset current_ring = 0;
IColumn::Offset current_polygon = 0;
IColumn::Offset current_multi_polygon = 0;
Offset& operator++()
{
++points_added;
if (points_added <= ring_offsets[current_ring])
return *this;
++current_ring;
if (current_ring < polygon_offsets[current_polygon])
return *this;
++current_polygon;
if (current_polygon < multi_polygon_offsets[current_multi_polygon])
return *this;
++current_multi_polygon;
return *this;
}
bool atLastPolygonOfMultiPolygon() { return current_polygon + 1 == multi_polygon_offsets[current_multi_polygon]; }
bool atLastRingOfPolygon() { return current_ring + 1 == polygon_offsets[current_polygon]; }
bool atLastPointOfRing() { return points_added == ring_offsets[current_ring]; }
bool allRingsHaveAPositiveArea()
{
IColumn::Offset prev_offset = 0;
for (const auto offset : ring_offsets)
{
if (offset - prev_offset < 3)
return false;
prev_offset = offset;
}
return true;
}
};
struct Data
{
std::vector<IPolygonDictionary::Polygon> & dest;
std::vector<size_t> & ids;
void addPolygon(bool new_multi_polygon = false)
{
dest.emplace_back();
ids.push_back((ids.empty() ? 0 : ids.back() + new_multi_polygon));
}
void addPoint(Float64 x, Float64 y)
{
auto & last_polygon = dest.back();
auto & last_ring = (last_polygon.inners().empty() ? last_polygon.outer() : last_polygon.inners().back());
last_ring.emplace_back(x, y);
}
};
void addNewPoint(Float64 x, Float64 y, Data & data, Offset & offset)
{
if (offset.atLastPointOfRing())
{
if (offset.atLastRingOfPolygon())
data.addPolygon(offset.atLastPolygonOfMultiPolygon());
else
{
/** An outer ring is added automatically with a new polygon, thus we need the else statement here.
* This also implies that if we are at this point we have to add an inner ring.
*/
auto & last_polygon = data.dest.back();
last_polygon.inners().emplace_back();
}
}
data.addPoint(x, y);
++offset;
}
const IColumn * unrollMultiPolygons(const ColumnPtr & column, Offset & offset)
{
const auto ptr_multi_polygons = typeid_cast<const ColumnArray*>(column.get());
if (!ptr_multi_polygons)
throw Exception{"Expected a column containing arrays of polygons", ErrorCodes::TYPE_MISMATCH};
offset.multi_polygon_offsets.assign(ptr_multi_polygons->getOffsets());
const auto ptr_polygons = typeid_cast<const ColumnArray*>(&ptr_multi_polygons->getData());
if (!ptr_polygons)
throw Exception{"Expected a column containing arrays of rings when reading polygons", ErrorCodes::TYPE_MISMATCH};
offset.polygon_offsets.assign(ptr_polygons->getOffsets());
const auto ptr_rings = typeid_cast<const ColumnArray*>(&ptr_polygons->getData());
if (!ptr_rings)
throw Exception{"Expected a column containing arrays of points when reading rings", ErrorCodes::TYPE_MISMATCH};
offset.ring_offsets.assign(ptr_rings->getOffsets());
return ptr_rings->getDataPtr().get();
}
const IColumn * unrollSimplePolygons(const ColumnPtr & column, Offset & offset)
{
const auto ptr_polygons = typeid_cast<const ColumnArray*>(column.get());
if (!ptr_polygons)
throw Exception{"Expected a column containing arrays of points", ErrorCodes::TYPE_MISMATCH};
offset.ring_offsets.assign(ptr_polygons->getOffsets());
std::iota(offset.polygon_offsets.begin(), offset.polygon_offsets.end(), 1);
offset.multi_polygon_offsets.assign(offset.polygon_offsets);
return ptr_polygons->getDataPtr().get();
}
void handlePointsReprByArrays(const IColumn * column, Data & data, Offset & offset)
{
const auto ptr_points = typeid_cast<const ColumnArray*>(column);
const auto ptr_coord = typeid_cast<const ColumnVector<Float64>*>(&ptr_points->getData());
if (!ptr_coord)
throw Exception{"Expected coordinates to be of type Float64", ErrorCodes::TYPE_MISMATCH};
const auto & offsets = ptr_points->getOffsets();
IColumn::Offset prev_offset = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
if (offsets[i] - prev_offset != 2)
throw Exception{"All points should be two-dimensional", ErrorCodes::BAD_ARGUMENTS};
prev_offset = offsets[i];
addNewPoint(ptr_coord->getElement(2 * i), ptr_coord->getElement(2 * i + 1), data, offset);
}
}
void handlePointsReprByTuples(const IColumn * column, Data & data, Offset & offset)
{
const auto ptr_points = typeid_cast<const ColumnTuple*>(column);
if (!ptr_points)
throw Exception{"Expected a column of tuples representing points", ErrorCodes::TYPE_MISMATCH};
if (ptr_points->tupleSize() != 2)
throw Exception{"Points should be two-dimensional", ErrorCodes::BAD_ARGUMENTS};
const auto column_x = typeid_cast<const ColumnVector<Float64>*>(&ptr_points->getColumn(0));
const auto column_y = typeid_cast<const ColumnVector<Float64>*>(&ptr_points->getColumn(1));
if (!column_x || !column_y)
throw Exception{"Expected coordinates to be of type Float64", ErrorCodes::TYPE_MISMATCH};
for (size_t i = 0; i < column_x->size(); ++i)
{
addNewPoint(column_x->getElement(i), column_y->getElement(i), data, offset);
}
}
}
void IPolygonDictionary::extractPolygons(const ColumnPtr &column)
{
Data data = {polygons, ids};
Offset offset;
const IColumn * points_collection = nullptr;
switch (input_type)
{
case InputType::MultiPolygon:
points_collection = unrollMultiPolygons(column, offset);
break;
case InputType::SimplePolygon:
points_collection = unrollSimplePolygons(column, offset);
break;
}
if (!offset.allRingsHaveAPositiveArea())
throw Exception{"Every ring included in a polygon or excluded from it should contain at least 3 points",
ErrorCodes::BAD_ARGUMENTS};
/** Adding the first empty polygon */
data.addPolygon(true);
switch (point_type)
{
case PointType::Array:
handlePointsReprByArrays(points_collection, data, offset);
break;
case PointType::Tuple:
handlePointsReprByTuples(points_collection, data, offset);
break;
}
}
SimplePolygonDictionary::SimplePolygonDictionary(
const std::string & database_,
const std::string & name_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
InputType input_type_,
PointType point_type_)
: IPolygonDictionary(database_, name_, dict_struct_, std::move(source_ptr_), dict_lifetime_, input_type_, point_type_)
{
}
std::shared_ptr<const IExternalLoadable> SimplePolygonDictionary::clone() const
{
return std::make_shared<SimplePolygonDictionary>(
this->database,
this->name,
this->dict_struct,
this->source_ptr->clone(),
this->dict_lifetime,
this->input_type,
this->point_type);
}
bool SimplePolygonDictionary::find(const Point &point, size_t & id) const
{
bool found = false;
double area = 0;
for (size_t i = 0; i < (this->polygons).size(); ++i)
{
if (bg::covered_by(point, (this->polygons)[i]))
{
double new_area = bg::area((this->polygons)[i]);
if (!found || new_area < area)
{
found = true;
id = i;
area = new_area;
}
}
}
return found;
}
void registerDictionaryPolygon(DictionaryFactory & factory)
{
auto create_layout = [=](const std::string &,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr) -> DictionaryPtr
{
const String database = config.getString(config_prefix + ".database", "");
const String name = config.getString(config_prefix + ".name");
if (!dict_struct.key)
throw Exception{"'key' is required for a dictionary of layout 'polygon'", ErrorCodes::BAD_ARGUMENTS};
if (dict_struct.key->size() != 1)
throw Exception{"The 'key' should consist of a single attribute for a dictionary of layout 'polygon'",
ErrorCodes::BAD_ARGUMENTS};
IPolygonDictionary::InputType input_type;
IPolygonDictionary::PointType point_type;
const auto key_type = (*dict_struct.key)[0].type;
const auto f64 = std::make_shared<DataTypeFloat64>();
const auto multi_polygon_array = DataTypeArray(std::make_shared<DataTypeArray>(std::make_shared<DataTypeArray>(std::make_shared<DataTypeArray>(f64))));
const auto multi_polygon_tuple = DataTypeArray(std::make_shared<DataTypeArray>(std::make_shared<DataTypeArray>(std::make_shared<DataTypeTuple>(std::vector<DataTypePtr>{f64, f64}))));
const auto simple_polygon_array = DataTypeArray(std::make_shared<DataTypeArray>(f64));
const auto simple_polygon_tuple = DataTypeArray(std::make_shared<DataTypeTuple>(std::vector<DataTypePtr>{f64, f64}));
if (key_type->equals(multi_polygon_array))
{
input_type = IPolygonDictionary::InputType::MultiPolygon;
point_type = IPolygonDictionary::PointType::Array;
}
else if (key_type->equals(multi_polygon_tuple))
{
input_type = IPolygonDictionary::InputType::MultiPolygon;
point_type = IPolygonDictionary::PointType::Tuple;
}
else if (key_type->equals(simple_polygon_array))
{
input_type = IPolygonDictionary::InputType::SimplePolygon;
point_type = IPolygonDictionary::PointType::Array;
}
else if (key_type->equals(simple_polygon_tuple))
{
input_type = IPolygonDictionary::InputType::SimplePolygon;
point_type = IPolygonDictionary::PointType::Tuple;
}
else
throw Exception{"The key type " + key_type->getName() +
" is not one of the following allowed types for a dictionary of layout 'polygon': " +
multi_polygon_array.getName() + " " +
multi_polygon_tuple.getName() + " " +
simple_polygon_array.getName() + " " +
simple_polygon_tuple.getName() + " ",
ErrorCodes::BAD_ARGUMENTS};
if (dict_struct.range_min || dict_struct.range_max)
throw Exception{name
+ ": elements range_min and range_max should be defined only "
"for a dictionary of layout 'range_hashed'",
ErrorCodes::BAD_ARGUMENTS};
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
return std::make_unique<SimplePolygonDictionary>(database, name, dict_struct, std::move(source_ptr), dict_lifetime, input_type, point_type);
};
factory.registerLayout("polygon", create_layout, true);
}
}

View File

@ -0,0 +1,293 @@
#pragma once
#include <atomic>
#include <variant>
#include <Core/Block.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Common/Arena.h>
#include <boost/geometry.hpp>
#include <boost/geometry/geometries/multi_polygon.hpp>
#include "DictionaryStructure.h"
#include "IDictionary.h"
#include "IDictionarySource.h"
namespace DB
{
namespace bg = boost::geometry;
/** An interface for polygon dictionaries.
* Polygons are read and stored as multi_polygons from boost::geometry in Euclidean coordinates.
* An implementation should inherit from this base class and preprocess the data upon construction if needed.
* It must override the find method of this class which retrieves the polygon containing a single point.
*/
class IPolygonDictionary : public IDictionaryBase
{
public:
/** Controls the different types of polygons allowed as input.
* The structure of a multi-polygon is as follows:
* - A multi-polygon is represented by a nonempty array of polygons.
* - A polygon is represented by a nonempty array of rings. The first element represents the outer ring. Zero
* or more following rings are cut out from the polygon.
* - A ring is represented by a nonempty array of points.
* - A point is represented by its coordinates stored in an according structure (see below).
* A simple polygon is represented by an one-dimensional array of points, stored in the according structure.
*/
enum class InputType
{
MultiPolygon,
SimplePolygon
};
/** Controls the different types allowed for providing the coordinates of points.
* Right now a point can be represented by either an array or a tuple of two Float64 values.
*/
enum class PointType
{
Array,
Tuple,
};
IPolygonDictionary(
const std::string & database_,
const std::string & name_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
DictionaryLifetime dict_lifetime_,
InputType input_type_,
PointType point_type_);
const std::string & getDatabase() const override;
const std::string & getName() const override;
const std::string & getFullName() const override;
std::string getTypeName() const override;
std::string getKeyDescription() const;
size_t getBytesAllocated() const override;
size_t getQueryCount() const override;
double getHitRate() const override;
size_t getElementCount() const override;
double getLoadFactor() const override;
const IDictionarySource * getSource() const override;
const DictionaryStructure & getStructure() const override;
const DictionaryLifetime & getLifetime() const override;
bool isInjective(const std::string & attribute_name) const override;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
/** Functions used to retrieve attributes of specific type by key. */
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, const Columns & key_columns, const DataTypes &, ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void getString(const std::string & attribute_name, const Columns & key_columns, const DataTypes &, ColumnString * out) const;
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes &, \
const PaddedPODArray<TYPE> & def, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void getString(
const std::string & attribute_name,
const Columns & key_columns,
const DataTypes &,
const ColumnString * const def,
ColumnString * const out) const;
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes &, \
const TYPE def, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void getString(
const std::string & attribute_name,
const Columns & key_columns,
const DataTypes & key_types,
const String & def,
ColumnString * const out) const;
/** Checks whether or not a point can be found in one of the polygons in the dictionary.
* The check is performed for multiple points represented by columns of their x and y coordinates.
* The boolean result is written to out.
*/
// TODO: Refactor the whole dictionary design to perform stronger checks, i.e. make this an override.
void has(const Columns & key_columns, const DataTypes & key_types, PaddedPODArray<UInt8> & out) const;
/** A two-dimensional point in Euclidean coordinates. */
using Point = bg::model::point<Float64, 2, bg::cs::cartesian>;
/** A polygon in boost is a an outer ring of points with zero or more cut out inner rings. */
using Polygon = bg::model::polygon<Point>;
protected:
/** Returns true if the given point can be found in the polygon dictionary.
* If true id is set to the index of a polygon containing the given point.
* Overridden in different implementations of this interface.
*/
virtual bool find(const Point & point, size_t & id) const = 0;
std::vector<Polygon> polygons;
/** Since the original data may have been in the form of multi-polygons, an id is stored for each single polygon
* corresponding to the row in which any other attributes for this entry are located.
*/
std::vector<size_t> ids;
const std::string database;
const std::string name;
const std::string full_name;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
const InputType input_type;
const PointType point_type;
private:
/** Helper functions for loading the data from the configuration.
* The polygons serving as keys are extracted into boost types.
* All other values are stored in one column per attribute.
*/
void createAttributes();
void blockToAttributes(const Block & block);
void loadData();
void calculateBytesAllocated();
/** Checks whether a given attribute exists and returns its index */
size_t getAttributeIndex(const std::string & attribute_name) const;
/** Helper functions to retrieve and instantiate the provided null value of an attribute.
* Since a null value is obligatory for every attribute they are simply appended to null_values defined below.
*/
template <typename T>
void appendNullValueImpl(const Field & null_value);
void appendNullValue(AttributeUnderlyingType type, const Field & value);
/** Helper function for retrieving the value of an attribute by key. */
template <typename AttributeType, typename OutputType, typename ValueSetter, typename DefaultGetter>
void getItemsImpl(size_t attribute_ind, const Columns & key_columns, ValueSetter && set_value, DefaultGetter && get_default) const;
/** A mapping from the names of the attributes to their index in the two vectors defined below. */
std::map<std::string, size_t> attribute_index_by_name;
/** A vector of columns storing the values of each attribute. */
Columns attributes;
/** A vector of null values corresponding to each attribute. */
std::vector<std::variant<
UInt8,
UInt16,
UInt32,
UInt64,
UInt128,
Int8,
Int16,
Int32,
Int64,
Decimal32,
Decimal64,
Decimal128,
Float32,
Float64,
String>> null_values;
size_t bytes_allocated = 0;
size_t element_count = 0;
mutable std::atomic<size_t> query_count{0};
/** Extracts a list of polygons from a column according to input_type and point_type.
* The polygons are appended to the dictionary with the corresponding ids.
*/
void extractPolygons(const ColumnPtr & column);
/** Extracts a list of points from two columns representing their x and y coordinates. */
static std::vector<Point> extractPoints(const Columns &key_columns);
};
/** Simple implementation of the polygon dictionary. Doesn't generate anything during its construction.
* Iterates over all stored polygons for each query, checking each of them in linear time.
* Retrieves the polygon with the smallest area containing the given point. If there is more than one any such polygon
* may be returned.
*/
class SimplePolygonDictionary : public IPolygonDictionary
{
public:
SimplePolygonDictionary(
const std::string & database_,
const std::string & name_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
DictionaryLifetime dict_lifetime_,
InputType input_type_,
PointType point_type_);
std::shared_ptr<const IExternalLoadable> clone() const override;
private:
bool find(const Point & point, size_t & id) const override;
};
}

View File

@ -29,6 +29,7 @@ void registerDictionaries()
registerDictionaryFlat(factory);
registerDictionaryHashed(factory);
registerDictionaryCache(factory);
registerDictionaryPolygon(factory);
}
}

View File

@ -24,6 +24,7 @@ void registerDictionaryTrie(DictionaryFactory & factory);
void registerDictionaryFlat(DictionaryFactory & factory);
void registerDictionaryHashed(DictionaryFactory & factory);
void registerDictionaryCache(DictionaryFactory & factory);
void registerDictionaryPolygon(DictionaryFactory & factory);
void registerDictionaries();
}

View File

@ -145,9 +145,19 @@ BlockInputStreamPtr FormatFactory::getInput(
// Doesn't make sense to use parallel parsing with less than four threads
// (segmentator + two parsers + reader).
if (settings.input_format_parallel_parsing
&& file_segmentation_engine
&& settings.max_threads >= 4)
bool parallel_parsing = settings.input_format_parallel_parsing && file_segmentation_engine && settings.max_threads >= 4;
if (parallel_parsing && name == "JSONEachRow")
{
/// FIXME ParallelParsingBlockInputStream doesn't support formats with non-trivial readPrefix() and readSuffix()
/// For JSONEachRow we can safely skip whitespace characters
skipWhitespaceIfAny(buf);
if (buf.eof() || *buf.position() == '[')
parallel_parsing = false; /// Disable it for JSONEachRow if data is in square brackets (see JSONEachRowRowInputFormat)
}
if (parallel_parsing)
{
const auto & input_getter = getCreators(name).input_processor_creator;
if (!input_getter)

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/Types.h>
#include <Columns/IColumn.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <IO/BufferWithOwnMemory.h>
@ -9,7 +10,6 @@
#include <unordered_map>
#include <boost/noncopyable.hpp>
namespace DB
{
@ -53,7 +53,9 @@ public:
/// This callback allows to perform some additional actions after writing a single row.
/// It's initial purpose was to flush Kafka message for each row.
using WriteCallback = std::function<void()>;
using WriteCallback = std::function<void(
const Columns & columns,
size_t row)>;
private:
using InputCreator = std::function<BlockInputStreamPtr(

View File

@ -45,7 +45,7 @@ try
BlockInputStreamPtr block_input = std::make_shared<InputStreamFromInputFormat>(std::move(input_format));
BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(
std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, [] {}, format_settings));
std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, [](const Columns & /* columns */, size_t /* row */){}, format_settings));
copyData(*block_input, *block_output);
return 0;

View File

@ -12,10 +12,12 @@
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeInterval.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/Native.h>
#include <DataTypes/NumberTraits.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnAggregateFunction.h>
#include "IFunctionImpl.h"
@ -59,6 +61,7 @@ template <typename A, typename B, typename Op, typename ResultType_ = typename O
struct BinaryOperationImplBase
{
using ResultType = ResultType_;
static const constexpr bool allow_fixed_string = false;
static void NO_INLINE vector_vector(const PaddedPODArray<A> & a, const PaddedPODArray<B> & b, PaddedPODArray<ResultType> & c)
{
@ -87,6 +90,33 @@ struct BinaryOperationImplBase
}
};
template <typename Op>
struct FixedStringOperationImpl
{
static void NO_INLINE vector_vector(const ColumnFixedString::Chars & a, const ColumnFixedString::Chars & b, ColumnFixedString::Chars & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
c[i] = Op::template apply<UInt8>(a[i], b[i]);
}
static void NO_INLINE vector_constant(const ColumnFixedString::Chars & a, const ColumnFixedString::Chars & b, ColumnFixedString::Chars & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
c[i] = Op::template apply<UInt8>(a[i], b[i % b.size()]);
}
static void NO_INLINE constant_vector(const ColumnFixedString::Chars & a, const ColumnFixedString::Chars & b, ColumnFixedString::Chars & c)
{
size_t size = b.size();
for (size_t i = 0; i < size; ++i)
c[i] = Op::template apply<UInt8>(a[i % a.size()], b[i]);
}
};
template <typename A, typename B, typename Op, typename ResultType = typename Op::ResultType>
struct BinaryOperationImpl : BinaryOperationImplBase<A, B, Op, ResultType>
{
@ -463,7 +493,8 @@ class FunctionBinaryArithmetic : public IFunction
DataTypeDateTime,
DataTypeDecimal<Decimal32>,
DataTypeDecimal<Decimal64>,
DataTypeDecimal<Decimal128>
DataTypeDecimal<Decimal128>,
DataTypeFixedString
>(type, std::forward<F>(f));
}
@ -719,27 +750,43 @@ public:
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
using ResultDataType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::ResultDataType;
if constexpr (!std::is_same_v<ResultDataType, InvalidType>)
if constexpr (std::is_same_v<DataTypeFixedString, LeftDataType> || std::is_same_v<DataTypeFixedString, RightDataType>)
{
if constexpr (IsDataTypeDecimal<LeftDataType> && IsDataTypeDecimal<RightDataType>)
if constexpr (!Op<DataTypeFixedString, DataTypeFixedString>::allow_fixed_string)
return false;
else if constexpr (std::is_same_v<LeftDataType, RightDataType>)
{
constexpr bool is_multiply = std::is_same_v<Op<UInt8, UInt8>, MultiplyImpl<UInt8, UInt8>>;
constexpr bool is_division = std::is_same_v<Op<UInt8, UInt8>, DivideFloatingImpl<UInt8, UInt8>> ||
std::is_same_v<Op<UInt8, UInt8>, DivideIntegralImpl<UInt8, UInt8>> ||
std::is_same_v<Op<UInt8, UInt8>, DivideIntegralOrZeroImpl<UInt8, UInt8>>;
ResultDataType result_type = decimalResultType(left, right, is_multiply, is_division);
type_res = std::make_shared<ResultDataType>(result_type.getPrecision(), result_type.getScale());
if (left.getN() == right.getN())
{
type_res = std::make_shared<LeftDataType>(left.getN());
return true;
}
}
else if constexpr (IsDataTypeDecimal<LeftDataType>)
type_res = std::make_shared<LeftDataType>(left.getPrecision(), left.getScale());
else if constexpr (IsDataTypeDecimal<RightDataType>)
type_res = std::make_shared<RightDataType>(right.getPrecision(), right.getScale());
else
type_res = std::make_shared<ResultDataType>();
return true;
}
else
{
using ResultDataType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::ResultDataType;
if constexpr (!std::is_same_v<ResultDataType, InvalidType>)
{
if constexpr (IsDataTypeDecimal<LeftDataType> && IsDataTypeDecimal<RightDataType>)
{
constexpr bool is_multiply = std::is_same_v<Op<UInt8, UInt8>, MultiplyImpl<UInt8, UInt8>>;
constexpr bool is_division = std::is_same_v<Op<UInt8, UInt8>, DivideFloatingImpl<UInt8, UInt8>> ||
std::is_same_v<Op<UInt8, UInt8>, DivideIntegralImpl<UInt8, UInt8>> ||
std::is_same_v<Op<UInt8, UInt8>, DivideIntegralOrZeroImpl<UInt8, UInt8>>;
ResultDataType result_type = decimalResultType(left, right, is_multiply, is_division);
type_res = std::make_shared<ResultDataType>(result_type.getPrecision(), result_type.getScale());
}
else if constexpr (IsDataTypeDecimal<LeftDataType>)
type_res = std::make_shared<LeftDataType>(left.getPrecision(), left.getScale());
else if constexpr (IsDataTypeDecimal<RightDataType>)
type_res = std::make_shared<RightDataType>(right.getPrecision(), right.getScale());
else
type_res = std::make_shared<ResultDataType>();
return true;
}
}
return false;
});
if (!valid)
@ -748,7 +795,206 @@ public:
return type_res;
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
bool executeFixedString(Block & block, const ColumnNumbers & arguments, size_t result)
{
using OpImpl = FixedStringOperationImpl<Op<UInt8, UInt8>>;
auto col_left_raw = block.getByPosition(arguments[0]).column.get();
auto col_right_raw = block.getByPosition(arguments[1]).column.get();
if (auto col_left_const = checkAndGetColumnConst<ColumnFixedString>(col_left_raw))
{
if (auto col_right_const = checkAndGetColumnConst<ColumnFixedString>(col_right_raw))
{
auto col_left = checkAndGetColumn<ColumnFixedString>(col_left_const->getDataColumn());
auto col_right = checkAndGetColumn<ColumnFixedString>(col_right_const->getDataColumn());
if (col_left->getN() != col_right->getN())
return false;
auto col_res = ColumnFixedString::create(col_left->getN());
auto & out_chars = col_res->getChars();
out_chars.resize(col_left->getN());
OpImpl::vector_vector(col_left->getChars(),
col_right->getChars(),
out_chars);
block.getByPosition(result).column = ColumnConst::create(std::move(col_res), block.rows());
return true;
}
}
bool is_left_column_const = checkAndGetColumnConst<ColumnFixedString>(col_left_raw) != nullptr;
bool is_right_column_const = checkAndGetColumnConst<ColumnFixedString>(col_right_raw) != nullptr;
auto col_left = is_left_column_const
? checkAndGetColumn<ColumnFixedString>(checkAndGetColumnConst<ColumnFixedString>(col_left_raw)->getDataColumn())
: checkAndGetColumn<ColumnFixedString>(col_left_raw);
auto col_right = is_right_column_const
? checkAndGetColumn<ColumnFixedString>(checkAndGetColumnConst<ColumnFixedString>(col_right_raw)->getDataColumn())
: checkAndGetColumn<ColumnFixedString>(col_right_raw);
if (col_left && col_right)
{
if (col_left->getN() != col_right->getN())
return false;
auto col_res = ColumnFixedString::create(col_left->getN());
auto & out_chars = col_res->getChars();
out_chars.resize((is_right_column_const ? col_left->size() : col_right->size()) * col_left->getN());
if (!is_left_column_const && !is_right_column_const)
{
OpImpl::vector_vector(col_left->getChars(),
col_right->getChars(),
out_chars);
}
else if (is_left_column_const)
{
OpImpl::constant_vector(col_left->getChars(),
col_right->getChars(),
out_chars);
}
else
{
OpImpl::vector_constant(col_left->getChars(),
col_right->getChars(),
out_chars);
}
block.getByPosition(result).column = std::move(col_res);
return true;
}
return false;
}
template<typename A, typename B>
bool executeNumeric(Block & block, const ColumnNumbers & arguments, size_t result [[maybe_unused]], const A & left, const B & right)
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
using ResultDataType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::ResultDataType;
if constexpr (!std::is_same_v<ResultDataType, InvalidType>)
{
constexpr bool result_is_decimal = IsDataTypeDecimal<LeftDataType> || IsDataTypeDecimal<RightDataType>;
constexpr bool is_multiply = std::is_same_v<Op<UInt8, UInt8>, MultiplyImpl<UInt8, UInt8>>;
constexpr bool is_division = std::is_same_v<Op<UInt8, UInt8>, DivideFloatingImpl<UInt8, UInt8>> ||
std::is_same_v<Op<UInt8, UInt8>, DivideIntegralImpl<UInt8, UInt8>> ||
std::is_same_v<Op<UInt8, UInt8>, DivideIntegralOrZeroImpl<UInt8, UInt8>>;
using T0 = typename LeftDataType::FieldType;
using T1 = typename RightDataType::FieldType;
using ResultType = typename ResultDataType::FieldType;
using ColVecT0 = std::conditional_t<IsDecimalNumber<T0>, ColumnDecimal<T0>, ColumnVector<T0>>;
using ColVecT1 = std::conditional_t<IsDecimalNumber<T1>, ColumnDecimal<T1>, ColumnVector<T1>>;
using ColVecResult = std::conditional_t<IsDecimalNumber<ResultType>, ColumnDecimal<ResultType>, ColumnVector<ResultType>>;
/// Decimal operations need scale. Operations are on result type.
using OpImpl = std::conditional_t<IsDataTypeDecimal<ResultDataType>,
DecimalBinaryOperation<T0, T1, Op, ResultType>,
BinaryOperationImpl<T0, T1, Op<T0, T1>, ResultType>>;
auto col_left_raw = block.getByPosition(arguments[0]).column.get();
auto col_right_raw = block.getByPosition(arguments[1]).column.get();
if (auto col_left = checkAndGetColumnConst<ColVecT0>(col_left_raw))
{
if (auto col_right = checkAndGetColumnConst<ColVecT1>(col_right_raw))
{
/// the only case with a non-vector result
if constexpr (result_is_decimal)
{
ResultDataType type = decimalResultType(left, right, is_multiply, is_division);
typename ResultDataType::FieldType scale_a = type.scaleFactorFor(left, is_multiply);
typename ResultDataType::FieldType scale_b = type.scaleFactorFor(right, is_multiply || is_division);
if constexpr (IsDataTypeDecimal<RightDataType> && is_division)
scale_a = right.getScaleMultiplier();
auto res = OpImpl::constant_constant(col_left->template getValue<T0>(), col_right->template getValue<T1>(),
scale_a, scale_b, check_decimal_overflow);
block.getByPosition(result).column =
ResultDataType(type.getPrecision(), type.getScale()).createColumnConst(
col_left->size(), toField(res, type.getScale()));
}
else
{
auto res = OpImpl::constant_constant(col_left->template getValue<T0>(), col_right->template getValue<T1>());
block.getByPosition(result).column = ResultDataType().createColumnConst(col_left->size(), toField(res));
}
return true;
}
}
typename ColVecResult::MutablePtr col_res = nullptr;
if constexpr (result_is_decimal)
{
ResultDataType type = decimalResultType(left, right, is_multiply, is_division);
col_res = ColVecResult::create(0, type.getScale());
}
else
col_res = ColVecResult::create();
auto & vec_res = col_res->getData();
vec_res.resize(block.rows());
if (auto col_left_const = checkAndGetColumnConst<ColVecT0>(col_left_raw))
{
if (auto col_right = checkAndGetColumn<ColVecT1>(col_right_raw))
{
if constexpr (result_is_decimal)
{
ResultDataType type = decimalResultType(left, right, is_multiply, is_division);
typename ResultDataType::FieldType scale_a = type.scaleFactorFor(left, is_multiply);
typename ResultDataType::FieldType scale_b = type.scaleFactorFor(right, is_multiply || is_division);
if constexpr (IsDataTypeDecimal<RightDataType> && is_division)
scale_a = right.getScaleMultiplier();
OpImpl::constant_vector(col_left_const->template getValue<T0>(), col_right->getData(), vec_res,
scale_a, scale_b, check_decimal_overflow);
}
else
OpImpl::constant_vector(col_left_const->template getValue<T0>(), col_right->getData(), vec_res);
}
else
return false;
}
else if (auto col_left = checkAndGetColumn<ColVecT0>(col_left_raw))
{
if constexpr (result_is_decimal)
{
ResultDataType type = decimalResultType(left, right, is_multiply, is_division);
typename ResultDataType::FieldType scale_a = type.scaleFactorFor(left, is_multiply);
typename ResultDataType::FieldType scale_b = type.scaleFactorFor(right, is_multiply || is_division);
if constexpr (IsDataTypeDecimal<RightDataType> && is_division)
scale_a = right.getScaleMultiplier();
if (auto col_right = checkAndGetColumn<ColVecT1>(col_right_raw))
{
OpImpl::vector_vector(col_left->getData(), col_right->getData(), vec_res, scale_a, scale_b,
check_decimal_overflow);
}
else if (auto col_right_const = checkAndGetColumnConst<ColVecT1>(col_right_raw))
{
OpImpl::vector_constant(col_left->getData(), col_right_const->template getValue<T1>(), vec_res,
scale_a, scale_b, check_decimal_overflow);
}
else
return false;
}
else
{
if (auto col_right = checkAndGetColumn<ColVecT1>(col_right_raw))
OpImpl::vector_vector(col_left->getData(), col_right->getData(), vec_res);
else if (auto col_right_const = checkAndGetColumnConst<ColVecT1>(col_right_raw))
OpImpl::vector_constant(col_left->getData(), col_right_const->template getValue<T1>(), vec_res);
else
return false;
}
}
else
return false;
block.getByPosition(result).column = std::move(col_res);
return true;
}
return false;
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{
/// Special case when multiply aggregate function state
if (isAggregateMultiply(block.getByPosition(arguments[0]).type, block.getByPosition(arguments[1]).type))
@ -777,132 +1023,15 @@ public:
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
using ResultDataType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::ResultDataType;
if constexpr (!std::is_same_v<ResultDataType, InvalidType>)
if constexpr (std::is_same_v<DataTypeFixedString, LeftDataType> || std::is_same_v<DataTypeFixedString, RightDataType>)
{
constexpr bool result_is_decimal = IsDataTypeDecimal<LeftDataType> || IsDataTypeDecimal<RightDataType>;
constexpr bool is_multiply = std::is_same_v<Op<UInt8, UInt8>, MultiplyImpl<UInt8, UInt8>>;
constexpr bool is_division = std::is_same_v<Op<UInt8, UInt8>, DivideFloatingImpl<UInt8, UInt8>> ||
std::is_same_v<Op<UInt8, UInt8>, DivideIntegralImpl<UInt8, UInt8>> ||
std::is_same_v<Op<UInt8, UInt8>, DivideIntegralOrZeroImpl<UInt8, UInt8>>;
using T0 = typename LeftDataType::FieldType;
using T1 = typename RightDataType::FieldType;
using ResultType = typename ResultDataType::FieldType;
using ColVecT0 = std::conditional_t<IsDecimalNumber<T0>, ColumnDecimal<T0>, ColumnVector<T0>>;
using ColVecT1 = std::conditional_t<IsDecimalNumber<T1>, ColumnDecimal<T1>, ColumnVector<T1>>;
using ColVecResult = std::conditional_t<IsDecimalNumber<ResultType>, ColumnDecimal<ResultType>, ColumnVector<ResultType>>;
/// Decimal operations need scale. Operations are on result type.
using OpImpl = std::conditional_t<IsDataTypeDecimal<ResultDataType>,
DecimalBinaryOperation<T0, T1, Op, ResultType>,
BinaryOperationImpl<T0, T1, Op<T0, T1>, ResultType>>;
auto col_left_raw = block.getByPosition(arguments[0]).column.get();
auto col_right_raw = block.getByPosition(arguments[1]).column.get();
if (auto col_left = checkAndGetColumnConst<ColVecT0>(col_left_raw))
{
if (auto col_right = checkAndGetColumnConst<ColVecT1>(col_right_raw))
{
/// the only case with a non-vector result
if constexpr (result_is_decimal)
{
ResultDataType type = decimalResultType(left, right, is_multiply, is_division);
typename ResultDataType::FieldType scale_a = type.scaleFactorFor(left, is_multiply);
typename ResultDataType::FieldType scale_b = type.scaleFactorFor(right, is_multiply || is_division);
if constexpr (IsDataTypeDecimal<RightDataType> && is_division)
scale_a = right.getScaleMultiplier();
auto res = OpImpl::constant_constant(col_left->template getValue<T0>(), col_right->template getValue<T1>(),
scale_a, scale_b, check_decimal_overflow);
block.getByPosition(result).column =
ResultDataType(type.getPrecision(), type.getScale()).createColumnConst(
col_left->size(), toField(res, type.getScale()));
}
else
{
auto res = OpImpl::constant_constant(col_left->template getValue<T0>(), col_right->template getValue<T1>());
block.getByPosition(result).column = ResultDataType().createColumnConst(col_left->size(), toField(res));
}
return true;
}
}
typename ColVecResult::MutablePtr col_res = nullptr;
if constexpr (result_is_decimal)
{
ResultDataType type = decimalResultType(left, right, is_multiply, is_division);
col_res = ColVecResult::create(0, type.getScale());
}
else
col_res = ColVecResult::create();
auto & vec_res = col_res->getData();
vec_res.resize(block.rows());
if (auto col_left_const = checkAndGetColumnConst<ColVecT0>(col_left_raw))
{
if (auto col_right = checkAndGetColumn<ColVecT1>(col_right_raw))
{
if constexpr (result_is_decimal)
{
ResultDataType type = decimalResultType(left, right, is_multiply, is_division);
typename ResultDataType::FieldType scale_a = type.scaleFactorFor(left, is_multiply);
typename ResultDataType::FieldType scale_b = type.scaleFactorFor(right, is_multiply || is_division);
if constexpr (IsDataTypeDecimal<RightDataType> && is_division)
scale_a = right.getScaleMultiplier();
OpImpl::constant_vector(col_left_const->template getValue<T0>(), col_right->getData(), vec_res,
scale_a, scale_b, check_decimal_overflow);
}
else
OpImpl::constant_vector(col_left_const->template getValue<T0>(), col_right->getData(), vec_res);
}
else
return false;
}
else if (auto col_left = checkAndGetColumn<ColVecT0>(col_left_raw))
{
if constexpr (result_is_decimal)
{
ResultDataType type = decimalResultType(left, right, is_multiply, is_division);
typename ResultDataType::FieldType scale_a = type.scaleFactorFor(left, is_multiply);
typename ResultDataType::FieldType scale_b = type.scaleFactorFor(right, is_multiply || is_division);
if constexpr (IsDataTypeDecimal<RightDataType> && is_division)
scale_a = right.getScaleMultiplier();
if (auto col_right = checkAndGetColumn<ColVecT1>(col_right_raw))
{
OpImpl::vector_vector(col_left->getData(), col_right->getData(), vec_res, scale_a, scale_b,
check_decimal_overflow);
}
else if (auto col_right_const = checkAndGetColumnConst<ColVecT1>(col_right_raw))
{
OpImpl::vector_constant(col_left->getData(), col_right_const->template getValue<T1>(), vec_res,
scale_a, scale_b, check_decimal_overflow);
}
else
return false;
}
else
{
if (auto col_right = checkAndGetColumn<ColVecT1>(col_right_raw))
OpImpl::vector_vector(col_left->getData(), col_right->getData(), vec_res);
else if (auto col_right_const = checkAndGetColumnConst<ColVecT1>(col_right_raw))
OpImpl::vector_constant(col_left->getData(), col_right_const->template getValue<T1>(), vec_res);
else
return false;
}
}
else
if constexpr (!Op<DataTypeFixedString, DataTypeFixedString>::allow_fixed_string)
return false;
block.getByPosition(result).column = std::move(col_res);
return true;
else
return executeFixedString(block, arguments, result);
}
return false;
else
return executeNumeric(block, arguments, result, left, right);
});
if (!valid)
throw Exception(getName() + "'s arguments do not match the expected data types", ErrorCodes::LOGICAL_ERROR);
@ -915,9 +1044,14 @@ public:
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
using ResultDataType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::ResultDataType;
using OpSpec = Op<typename LeftDataType::FieldType, typename RightDataType::FieldType>;
return !std::is_same_v<ResultDataType, InvalidType> && !IsDataTypeDecimal<ResultDataType> && OpSpec::compilable;
if constexpr (std::is_same_v<DataTypeFixedString, LeftDataType> || std::is_same_v<DataTypeFixedString, RightDataType>)
return false;
else
{
using ResultDataType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::ResultDataType;
using OpSpec = Op<typename LeftDataType::FieldType, typename RightDataType::FieldType>;
return !std::is_same_v<ResultDataType, InvalidType> && !IsDataTypeDecimal<ResultDataType> && OpSpec::compilable;
}
});
}
@ -928,16 +1062,19 @@ public:
{
using LeftDataType = std::decay_t<decltype(left)>;
using RightDataType = std::decay_t<decltype(right)>;
using ResultDataType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::ResultDataType;
using OpSpec = Op<typename LeftDataType::FieldType, typename RightDataType::FieldType>;
if constexpr (!std::is_same_v<ResultDataType, InvalidType> && !IsDataTypeDecimal<ResultDataType> && OpSpec::compilable)
if constexpr (!std::is_same_v<DataTypeFixedString, LeftDataType> && !std::is_same_v<DataTypeFixedString, RightDataType>)
{
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
auto type = std::make_shared<ResultDataType>();
auto * lval = nativeCast(b, types[0], values[0](), type);
auto * rval = nativeCast(b, types[1], values[1](), type);
result = OpSpec::compile(b, lval, rval, std::is_signed_v<typename ResultDataType::FieldType>);
return true;
using ResultDataType = typename BinaryOperationTraits<Op, LeftDataType, RightDataType>::ResultDataType;
using OpSpec = Op<typename LeftDataType::FieldType, typename RightDataType::FieldType>;
if constexpr (!std::is_same_v<ResultDataType, InvalidType> && !IsDataTypeDecimal<ResultDataType> && OpSpec::compilable)
{
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
auto type = std::make_shared<ResultDataType>();
auto * lval = nativeCast(b, types[0], values[0](), type);
auto * rval = nativeCast(b, types[1], values[1](), type);
result = OpSpec::compile(b, lval, rval, std::is_signed_v<typename ResultDataType::FieldType>);
return true;
}
}
return false;
});

View File

@ -2,9 +2,11 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/Native.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnFixedString.h>
#include <Functions/IFunctionImpl.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/castTypeToEither.h>
@ -51,6 +53,18 @@ struct UnaryOperationImpl
};
template <typename Op>
struct FixedStringUnaryOperationImpl
{
static void NO_INLINE vector(const ColumnFixedString::Chars & a, ColumnFixedString::Chars & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
c[i] = Op::apply(a[i]);
}
};
template <typename FunctionName>
struct FunctionUnaryArithmeticMonotonicity;
@ -65,6 +79,7 @@ template <template <typename> class Op, typename Name, bool is_injective>
class FunctionUnaryArithmetic : public IFunction
{
static constexpr bool allow_decimal = std::is_same_v<Op<Int8>, NegateImpl<Int8>> || std::is_same_v<Op<Int8>, AbsImpl<Int8>>;
static constexpr bool allow_fixed_string = Op<UInt8>::allow_fixed_string;
template <typename F>
static bool castType(const IDataType * type, F && f)
@ -82,7 +97,8 @@ class FunctionUnaryArithmetic : public IFunction
DataTypeFloat64,
DataTypeDecimal<Decimal32>,
DataTypeDecimal<Decimal64>,
DataTypeDecimal<Decimal128>
DataTypeDecimal<Decimal128>,
DataTypeFixedString
>(type, std::forward<F>(f));
}
@ -106,16 +122,25 @@ public:
bool valid = castType(arguments[0].get(), [&](const auto & type)
{
using DataType = std::decay_t<decltype(type)>;
using T0 = typename DataType::FieldType;
if constexpr (IsDataTypeDecimal<DataType>)
if constexpr (std::is_same_v<DataTypeFixedString, DataType>)
{
if constexpr (!allow_decimal)
if constexpr (!Op<DataTypeFixedString>::allow_fixed_string)
return false;
result = std::make_shared<DataType>(type.getPrecision(), type.getScale());
result = std::make_shared<DataType>(type.getN());
}
else
result = std::make_shared<DataTypeNumber<typename Op<T0>::ResultType>>();
{
using T0 = typename DataType::FieldType;
if constexpr (IsDataTypeDecimal<DataType>)
{
if constexpr (!allow_decimal)
return false;
result = std::make_shared<DataType>(type.getPrecision(), type.getScale());
}
else
result = std::make_shared<DataTypeNumber<typename Op<T0>::ResultType>>();
}
return true;
});
if (!valid)
@ -129,10 +154,25 @@ public:
bool valid = castType(block.getByPosition(arguments[0]).type.get(), [&](const auto & type)
{
using DataType = std::decay_t<decltype(type)>;
using T0 = typename DataType::FieldType;
if constexpr (IsDataTypeDecimal<DataType>)
if constexpr (std::is_same_v<DataTypeFixedString, DataType>)
{
if constexpr (allow_fixed_string)
{
if (auto col = checkAndGetColumn<ColumnFixedString>(block.getByPosition(arguments[0]).column.get()))
{
auto col_res = ColumnFixedString::create(col->getN());
auto & vec_res = col_res->getChars();
vec_res.resize(col->size() * col->getN());
FixedStringUnaryOperationImpl<Op<UInt8>>::vector(col->getChars(), vec_res);
block.getByPosition(result).column = std::move(col_res);
return true;
}
}
}
else if constexpr (IsDataTypeDecimal<DataType>)
{
using T0 = typename DataType::FieldType;
if constexpr (allow_decimal)
{
if (auto col = checkAndGetColumn<ColumnDecimal<T0>>(block.getByPosition(arguments[0]).column.get()))
@ -148,6 +188,7 @@ public:
}
else
{
using T0 = typename DataType::FieldType;
if (auto col = checkAndGetColumn<ColumnVector<T0>>(block.getByPosition(arguments[0]).column.get()))
{
auto col_res = ColumnVector<typename Op<T0>::ResultType>::create();
@ -171,7 +212,10 @@ public:
return castType(arguments[0].get(), [&](const auto & type)
{
using DataType = std::decay_t<decltype(type)>;
return !IsDataTypeDecimal<DataType> && Op<typename DataType::FieldType>::compilable;
if constexpr (std::is_same_v<DataTypeFixedString, DataType>)
return false;
else
return !IsDataTypeDecimal<DataType> && Op<typename DataType::FieldType>::compilable;
});
}
@ -181,14 +225,19 @@ public:
castType(types[0].get(), [&](const auto & type)
{
using DataType = std::decay_t<decltype(type)>;
using T0 = typename DataType::FieldType;
using T1 = typename Op<T0>::ResultType;
if constexpr (!std::is_same_v<T1, InvalidType> && !IsDataTypeDecimal<DataType> && Op<T0>::compilable)
if constexpr (std::is_same_v<DataTypeFixedString, DataType>)
return false;
else
{
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
auto * v = nativeCast(b, types[0], values[0](), std::make_shared<DataTypeNumber<T1>>());
result = Op<T0>::compile(b, v, is_signed_v<T1>);
return true;
using T0 = typename DataType::FieldType;
using T1 = typename Op<T0>::ResultType;
if constexpr (!std::is_same_v<T1, InvalidType> && !IsDataTypeDecimal<DataType> && Op<T0>::compilable)
{
auto & b = static_cast<llvm::IRBuilder<> &>(builder);
auto * v = nativeCast(b, types[0], values[0](), std::make_shared<DataTypeNumber<T1>>());
result = Op<T0>::compile(b, v, is_signed_v<T1>);
return true;
}
}
return false;
});

View File

@ -35,7 +35,6 @@
#include <Columns/ColumnsCommon.h>
#include <Common/FieldVisitors.h>
#include <Common/assert_cast.h>
#include <Interpreters/ExpressionActions.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/FunctionsMiscellaneous.h>
#include <Functions/FunctionHelpers.h>

View File

@ -33,6 +33,7 @@
#include <Dictionaries/ComplexKeyCacheDictionary.h>
#include <Dictionaries/RangeHashedDictionary.h>
#include <Dictionaries/TrieDictionary.h>
#include <Dictionaries/PolygonDictionary.h>
#include <ext/range.h>
@ -134,7 +135,8 @@ private:
!executeDispatchSimple<CacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict_ptr))
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<SimplePolygonDictionary>(block, arguments, result, dict_ptr))
throw Exception{"Unsupported dictionary type " + dict_ptr->getTypeName(), ErrorCodes::UNKNOWN_TYPE};
}
@ -305,6 +307,7 @@ private:
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<SimplePolygonDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchRange<RangeHashedDictionary>(block, arguments, result, dict_ptr))
throw Exception{"Unsupported dictionary type " + dict_ptr->getTypeName(), ErrorCodes::UNKNOWN_TYPE};
}
@ -485,6 +488,7 @@ private:
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<SimplePolygonDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict_ptr))
throw Exception{"Unsupported dictionary type " + dict_ptr->getTypeName(), ErrorCodes::UNKNOWN_TYPE};
}
@ -823,6 +827,7 @@ private:
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<SimplePolygonDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchRange<RangeHashedDictionary>(block, arguments, result, dict_ptr))
throw Exception{"Unsupported dictionary type " + dict_ptr->getTypeName(), ErrorCodes::UNKNOWN_TYPE};
}
@ -1081,6 +1086,7 @@ private:
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<SimplePolygonDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict_ptr))
throw Exception{"Unsupported dictionary type " + dict_ptr->getTypeName(), ErrorCodes::UNKNOWN_TYPE};
}

View File

@ -261,7 +261,7 @@ using FunctionReinterpretAsDate = FunctionReinterpretStringAs<DataTypeDate,
using FunctionReinterpretAsDateTime = FunctionReinterpretStringAs<DataTypeDateTime, NameReinterpretAsDateTime>;
using FunctionReinterpretAsString = FunctionReinterpretAsStringImpl<NameReinterpretAsString>;
using FunctionReinterpretAsFixedString = FunctionReinterpretAsStringImpl<NameReinterpretAsFixedString>;
using FunctionReinterpretAsFixedString = FunctionReinterpretAsFixedStringImpl<NameReinterpretAsFixedString>;
}

View File

@ -10,6 +10,7 @@ template <typename A>
struct AbsImpl
{
using ResultType = std::conditional_t<IsDecimalNumber<A>, A, typename NumberTraits::ResultOfAbs<A>::Type>;
static const constexpr bool allow_fixed_string = false;
static inline NO_SANITIZE_UNDEFINED ResultType apply(A a)
{

View File

@ -9,6 +9,7 @@ template <typename A, typename B>
struct BitAndImpl
{
using ResultType = typename NumberTraits::ResultOfBit<A, B>::Type;
static constexpr const bool allow_fixed_string = true;
template <typename Result = ResultType>
static inline Result apply(A a, B b)
@ -29,7 +30,7 @@ struct BitAndImpl
};
struct NameBitAnd { static constexpr auto name = "bitAnd"; };
using FunctionBitAnd = FunctionBinaryArithmetic<BitAndImpl, NameBitAnd>;
using FunctionBitAnd = FunctionBinaryArithmetic<BitAndImpl, NameBitAnd, true>;
void registerFunctionBitAnd(FunctionFactory & factory)
{

View File

@ -18,6 +18,7 @@ namespace DB
struct BitBoolMaskAndImpl
{
using ResultType = UInt8;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline Result apply(A left, B right)

View File

@ -18,6 +18,7 @@ namespace DB
struct BitBoolMaskOrImpl
{
using ResultType = UInt8;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline Result apply(A left, B right)

View File

@ -10,6 +10,7 @@ template <typename A>
struct BitCountImpl
{
using ResultType = UInt8;
static constexpr bool allow_fixed_string = false;
static inline ResultType apply(A a)
{

View File

@ -10,6 +10,7 @@ template <typename A>
struct BitNotImpl
{
using ResultType = typename NumberTraits::ResultOfBitNot<A>::Type;
static const constexpr bool allow_fixed_string = true;
static inline ResultType apply(A a)
{

View File

@ -8,6 +8,7 @@ template <typename A, typename B>
struct BitOrImpl
{
using ResultType = typename NumberTraits::ResultOfBit<A, B>::Type;
static constexpr const bool allow_fixed_string = true;
template <typename Result = ResultType>
static inline Result apply(A a, B b)
@ -28,7 +29,7 @@ struct BitOrImpl
};
struct NameBitOr { static constexpr auto name = "bitOr"; };
using FunctionBitOr = FunctionBinaryArithmetic<BitOrImpl, NameBitOr>;
using FunctionBitOr = FunctionBinaryArithmetic<BitOrImpl, NameBitOr, true>;
void registerFunctionBitOr(FunctionFactory & factory)
{

View File

@ -8,6 +8,7 @@ template <typename A, typename B>
struct BitRotateLeftImpl
{
using ResultType = typename NumberTraits::ResultOfBit<A, B>::Type;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline NO_SANITIZE_UNDEFINED Result apply(A a, B b)

View File

@ -8,6 +8,7 @@ template <typename A, typename B>
struct BitRotateRightImpl
{
using ResultType = typename NumberTraits::ResultOfBit<A, B>::Type;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline NO_SANITIZE_UNDEFINED Result apply(A a, B b)

View File

@ -8,6 +8,7 @@ template <typename A, typename B>
struct BitShiftLeftImpl
{
using ResultType = typename NumberTraits::ResultOfBit<A, B>::Type;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline NO_SANITIZE_UNDEFINED Result apply(A a, B b)

View File

@ -8,6 +8,7 @@ template <typename A, typename B>
struct BitShiftRightImpl
{
using ResultType = typename NumberTraits::ResultOfBit<A, B>::Type;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline NO_SANITIZE_UNDEFINED Result apply(A a, B b)

View File

@ -15,6 +15,7 @@ namespace DB
struct BitSwapLastTwoImpl
{
using ResultType = UInt8;
static constexpr const bool allow_fixed_string = false;
static inline ResultType NO_SANITIZE_UNDEFINED apply(A a)
{

View File

@ -10,6 +10,7 @@ template <typename A, typename B>
struct BitTestImpl
{
using ResultType = UInt8;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
NO_SANITIZE_UNDEFINED static inline Result apply(A a, B b)

View File

@ -16,6 +16,7 @@ namespace DB
struct BitWrapperFuncImpl
{
using ResultType = UInt8;
static constexpr const bool allow_fixed_string = false;
static inline ResultType NO_SANITIZE_UNDEFINED apply(A a)
{

View File

@ -8,6 +8,7 @@ template <typename A, typename B>
struct BitXorImpl
{
using ResultType = typename NumberTraits::ResultOfBit<A, B>::Type;
static constexpr bool allow_fixed_string = true;
template <typename Result = ResultType>
static inline Result apply(A a, B b)
@ -28,7 +29,7 @@ struct BitXorImpl
};
struct NameBitXor { static constexpr auto name = "bitXor"; };
using FunctionBitXor = FunctionBinaryArithmetic<BitXorImpl, NameBitXor>;
using FunctionBitXor = FunctionBinaryArithmetic<BitXorImpl, NameBitXor, true>;
void registerFunctionBitXor(FunctionFactory & factory)
{

View File

@ -9,6 +9,7 @@ struct DivideFloatingImpl
{
using ResultType = typename NumberTraits::ResultOfFloatingPointDivision<A, B>::Type;
static const constexpr bool allow_decimal = true;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline NO_SANITIZE_UNDEFINED Result apply(A a, B b)

View File

@ -10,6 +10,7 @@ template <typename A, typename B>
struct GCDImpl
{
using ResultType = typename NumberTraits::ResultOfAdditionMultiplication<A, B>::Type;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline Result apply(A a, B b)

View File

@ -9,6 +9,7 @@ template <typename A, typename B>
struct GreatestBaseImpl
{
using ResultType = NumberTraits::ResultOfGreatest<A, B>;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline Result apply(A a, B b)
@ -34,6 +35,7 @@ template <typename A, typename B>
struct GreatestSpecialImpl
{
using ResultType = std::make_unsigned_t<A>;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline Result apply(A a, B b)

View File

@ -19,6 +19,7 @@ struct DivideIntegralByConstantImpl
: BinaryOperationImplBase<A, B, DivideIntegralImpl<A, B>>
{
using ResultType = typename DivideIntegralImpl<A, B>::ResultType;
static const constexpr bool allow_fixed_string = false;
static void vector_constant(const PaddedPODArray<A> & a, B b, PaddedPODArray<ResultType> & c)
{

View File

@ -50,6 +50,7 @@ template <typename A, typename B>
struct DivideIntegralImpl
{
using ResultType = typename NumberTraits::ResultOfIntegerDivision<A, B>::Type;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline Result apply(A a, B b)

View File

@ -11,6 +11,7 @@ template <typename A, typename B>
struct DivideIntegralOrZeroImpl
{
using ResultType = typename NumberTraits::ResultOfIntegerDivision<A, B>::Type;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline Result apply(A a, B b)

View File

@ -10,6 +10,7 @@ template <typename A>
struct IntExp10Impl
{
using ResultType = UInt64;
static constexpr const bool allow_fixed_string = false;
static inline ResultType apply(A a)
{

View File

@ -10,6 +10,7 @@ template <typename A>
struct IntExp2Impl
{
using ResultType = UInt64;
static constexpr const bool allow_fixed_string = false;
static inline ResultType apply(A a)
{

View File

@ -10,6 +10,8 @@ template <typename A, typename B>
struct LCMImpl
{
using ResultType = typename NumberTraits::ResultOfAdditionMultiplication<A, B>::Type;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline Result apply(A a, B b)

View File

@ -9,6 +9,7 @@ template <typename A, typename B>
struct LeastBaseImpl
{
using ResultType = NumberTraits::ResultOfLeast<A, B>;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline Result apply(A a, B b)
@ -34,6 +35,7 @@ template <typename A, typename B>
struct LeastSpecialImpl
{
using ResultType = std::make_signed_t<A>;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline Result apply(A a, B b)

View File

@ -10,6 +10,7 @@ struct MinusImpl
{
using ResultType = typename NumberTraits::ResultOfSubtraction<A, B>::Type;
static const constexpr bool allow_decimal = true;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline NO_SANITIZE_UNDEFINED Result apply(A a, B b)

View File

@ -20,6 +20,7 @@ template <typename A, typename B>
struct ModuloImpl
{
using ResultType = typename NumberTraits::ResultOfModulo<A, B>::Type;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline Result apply(A a, B b)
@ -38,6 +39,7 @@ struct ModuloByConstantImpl
: BinaryOperationImplBase<A, B, ModuloImpl<A, B>>
{
using ResultType = typename ModuloImpl<A, B>::ResultType;
static const constexpr bool allow_fixed_string = false;
static void vector_constant(const PaddedPODArray<A> & a, B b, PaddedPODArray<ResultType> & c)
{

View File

@ -10,6 +10,7 @@ struct MultiplyImpl
{
using ResultType = typename NumberTraits::ResultOfAdditionMultiplication<A, B>::Type;
static const constexpr bool allow_decimal = true;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline NO_SANITIZE_UNDEFINED Result apply(A a, B b)

View File

@ -9,6 +9,7 @@ template <typename A>
struct NegateImpl
{
using ResultType = std::conditional_t<IsDecimalNumber<A>, A, typename NumberTraits::ResultOfNegate<A>::Type>;
static constexpr const bool allow_fixed_string = false;
static inline NO_SANITIZE_UNDEFINED ResultType apply(A a)
{

View File

@ -10,6 +10,7 @@ struct PlusImpl
{
using ResultType = typename NumberTraits::ResultOfAdditionMultiplication<A, B>::Type;
static const constexpr bool allow_decimal = true;
static const constexpr bool allow_fixed_string = false;
template <typename Result = ResultType>
static inline NO_SANITIZE_UNDEFINED Result apply(A a, B b)

View File

@ -8,6 +8,7 @@ template <typename A>
struct RoundAgeImpl
{
using ResultType = UInt8;
static constexpr const bool allow_fixed_string = false;
static inline ResultType apply(A x)
{

View File

@ -8,6 +8,7 @@ template <typename A>
struct RoundDurationImpl
{
using ResultType = UInt16;
static constexpr const bool allow_fixed_string = false;
static inline ResultType apply(A x)
{

View File

@ -49,6 +49,7 @@ template <typename T>
struct RoundToExp2Impl
{
using ResultType = T;
static constexpr const bool allow_fixed_string = false;
static inline T apply(T x)
{

View File

@ -16,6 +16,7 @@ struct ConnectionTimeouts
Poco::Timespan receive_timeout;
Poco::Timespan tcp_keep_alive_timeout;
Poco::Timespan http_keep_alive_timeout;
Poco::Timespan secure_connection_timeout;
ConnectionTimeouts() = default;
@ -26,7 +27,8 @@ struct ConnectionTimeouts
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(0),
http_keep_alive_timeout(0)
http_keep_alive_timeout(0),
secure_connection_timeout(connection_timeout)
{
}
@ -38,7 +40,8 @@ struct ConnectionTimeouts
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(tcp_keep_alive_timeout_),
http_keep_alive_timeout(0)
http_keep_alive_timeout(0),
secure_connection_timeout(connection_timeout)
{
}
ConnectionTimeouts(const Poco::Timespan & connection_timeout_,
@ -50,10 +53,25 @@ struct ConnectionTimeouts
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(tcp_keep_alive_timeout_),
http_keep_alive_timeout(http_keep_alive_timeout_)
http_keep_alive_timeout(http_keep_alive_timeout_),
secure_connection_timeout(connection_timeout)
{
}
ConnectionTimeouts(const Poco::Timespan & connection_timeout_,
const Poco::Timespan & send_timeout_,
const Poco::Timespan & receive_timeout_,
const Poco::Timespan & tcp_keep_alive_timeout_,
const Poco::Timespan & http_keep_alive_timeout_,
const Poco::Timespan & secure_connection_timeout_)
: connection_timeout(connection_timeout_),
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(tcp_keep_alive_timeout_),
http_keep_alive_timeout(http_keep_alive_timeout_),
secure_connection_timeout(secure_connection_timeout_)
{
}
static Poco::Timespan saturate(const Poco::Timespan & timespan, const Poco::Timespan & limit)
{
@ -69,7 +87,8 @@ struct ConnectionTimeouts
saturate(send_timeout, limit),
saturate(receive_timeout, limit),
saturate(tcp_keep_alive_timeout, limit),
saturate(http_keep_alive_timeout, limit));
saturate(http_keep_alive_timeout, limit),
saturate(secure_connection_timeout, limit));
}
/// Timeouts for the case when we have just single attempt to connect.
@ -81,7 +100,7 @@ struct ConnectionTimeouts
/// Timeouts for the case when we will try many addresses in a loop.
static ConnectionTimeouts getTCPTimeoutsWithFailover(const Settings & settings)
{
return ConnectionTimeouts(settings.connect_timeout_with_failover_ms, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout);
return ConnectionTimeouts(settings.connect_timeout_with_failover_ms, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout, 0, settings.connect_timeout_with_failover_secure_ms);
}
static ConnectionTimeouts getHTTPTimeouts(const Context & context)

View File

@ -6,17 +6,28 @@
#include <port/unistd.h>
#include <IO/ReadBufferAIO.h>
#include <fstream>
#include <string>
namespace
{
std::string createTmpFileForEOFtest()
{
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
return std::string(dir) + "/foo";
if (char * dir = ::mkdtemp(pattern); dir)
{
return std::string(dir) + "/foo";
}
else
{
/// We have no tmp in docker
/// So we have to use root
std::string almost_rand_dir = std::string{"/"} + std::to_string(rand()) + "foo";
return almost_rand_dir;
}
}
void prepare_for_eof(std::string & filename, std::string & buf)
void prepareForEOF(std::string & filename, std::string & buf)
{
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
@ -28,7 +39,7 @@ void prepare_for_eof(std::string & filename, std::string & buf)
for (size_t i = 0; i < n; ++i)
buf += symbols[i % symbols.length()];
std::ofstream out(filename.c_str());
std::ofstream out(filename);
out << buf;
}
@ -39,7 +50,7 @@ TEST(ReadBufferAIOTest, TestReadAfterAIO)
using namespace DB;
std::string data;
std::string file_path;
prepare_for_eof(file_path, data);
prepareForEOF(file_path, data);
ReadBufferAIO testbuf(file_path);
std::string newdata;

View File

@ -1,3 +1,4 @@
#include "Common/quoteString.h"
#include <Common/typeid_cast.h>
#include <Common/PODArray.h>
#include <Core/Row.h>
@ -334,7 +335,7 @@ void ActionsMatcher::visit(const ASTIdentifier & identifier, const ASTPtr & ast,
found = true;
if (found)
throw Exception("Column " + column_name.get(ast) + " is not under aggregate function and not in GROUP BY.",
throw Exception("Column " + backQuote(column_name.get(ast)) + " is not under aggregate function and not in GROUP BY",
ErrorCodes::NOT_AN_AGGREGATE);
/// Special check for WITH statement alias. Add alias action to be able to use this alias.

View File

@ -2,7 +2,6 @@
#include <Parsers/IAST.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/SubqueryForSet.h>
#include <Interpreters/InDepthNodeVisitor.h>
@ -13,6 +12,9 @@ namespace DB
class Context;
class ASTFunction;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/// The case of an explicit enumeration of values.
SetPtr makeExplicitSet(
const ASTFunction * node, const Block & sample_block, bool create_ordered_set,

View File

@ -72,7 +72,8 @@ bool Cluster::Address::isLocal(UInt16 clickhouse_port) const
}
Cluster::Address::Address(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
Cluster::Address::Address(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, UInt32 shard_index_, UInt32 replica_index_) :
shard_index(shard_index_), replica_index(replica_index_)
{
host_name = config.getString(config_prefix + ".host");
port = static_cast<UInt16>(config.getInt(config_prefix + ".port"));
@ -137,12 +138,8 @@ std::pair<String, UInt16> Cluster::Address::fromString(const String & host_port_
String Cluster::Address::toFullString() const
{
return
escapeForFileName(user) +
(password.empty() ? "" : (':' + escapeForFileName(password))) + '@' +
escapeForFileName(host_name) + ':' +
std::to_string(port) +
(default_database.empty() ? "" : ('#' + escapeForFileName(default_database)))
+ ((secure == Protocol::Secure::Enable) ? "+secure" : "");
((shard_index == 0) ? "" : "shard" + std::to_string(shard_index)) +
((replica_index == 0) ? "" : "_replica" + std::to_string(replica_index));
}
Cluster::Address Cluster::Address::fromFullString(const String & full_string)
@ -150,35 +147,55 @@ Cluster::Address Cluster::Address::fromFullString(const String & full_string)
const char * address_begin = full_string.data();
const char * address_end = address_begin + full_string.size();
Protocol::Secure secure = Protocol::Secure::Disable;
const char * secure_tag = "+secure";
if (endsWith(full_string, secure_tag))
{
address_end -= strlen(secure_tag);
secure = Protocol::Secure::Enable;
}
const char * user_pw_end = strchr(full_string.data(), '@');
const char * colon = strchr(full_string.data(), ':');
if (!user_pw_end || !colon)
throw Exception("Incorrect user[:password]@host:port#default_database format " + full_string, ErrorCodes::SYNTAX_ERROR);
const bool has_pw = colon < user_pw_end;
const char * host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon;
if (!host_end)
throw Exception("Incorrect address '" + full_string + "', it does not contain port", ErrorCodes::SYNTAX_ERROR);
/// parsing with the new [shard{shard_index}[_replica{replica_index}]] format
if (!user_pw_end && startsWith(full_string, "shard"))
{
const char * underscore = strchr(full_string.data(), '_');
const char * has_db = strchr(full_string.data(), '#');
const char * port_end = has_db ? has_db : address_end;
Address address;
address.shard_index = parse<UInt32>(address_begin + strlen("shard"));
address.replica_index = underscore ? parse<UInt32>(underscore + strlen("_replica")) : 0;
Address address;
address.secure = secure;
address.port = parse<UInt16>(host_end + 1, port_end - (host_end + 1));
address.host_name = unescapeForFileName(std::string(user_pw_end + 1, host_end));
address.user = unescapeForFileName(std::string(address_begin, has_pw ? colon : user_pw_end));
address.password = has_pw ? unescapeForFileName(std::string(colon + 1, user_pw_end)) : std::string();
address.default_database = has_db ? unescapeForFileName(std::string(has_db + 1, address_end)) : std::string();
return address;
return address;
}
else
{
/// parsing with the old user[:password]@host:port#default_database format
/// This format is appeared to be inconvenient for the following reasons:
/// - credentials are exposed in file name;
/// - the file name can be too long.
Protocol::Secure secure = Protocol::Secure::Disable;
const char * secure_tag = "+secure";
if (endsWith(full_string, secure_tag))
{
address_end -= strlen(secure_tag);
secure = Protocol::Secure::Enable;
}
const char * colon = strchr(full_string.data(), ':');
if (!user_pw_end || !colon)
throw Exception("Incorrect user[:password]@host:port#default_database format " + full_string, ErrorCodes::SYNTAX_ERROR);
const bool has_pw = colon < user_pw_end;
const char * host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon;
if (!host_end)
throw Exception("Incorrect address '" + full_string + "', it does not contain port", ErrorCodes::SYNTAX_ERROR);
const char * has_db = strchr(full_string.data(), '#');
const char * port_end = has_db ? has_db : address_end;
Address address;
address.secure = secure;
address.port = parse<UInt16>(host_end + 1, port_end - (host_end + 1));
address.host_name = unescapeForFileName(std::string(user_pw_end + 1, host_end));
address.user = unescapeForFileName(std::string(address_begin, has_pw ? colon : user_pw_end));
address.password = has_pw ? unescapeForFileName(std::string(colon + 1, user_pw_end)) : std::string();
address.default_database = has_db ? unescapeForFileName(std::string(has_db + 1, address_end)) : std::string();
return address;
}
}
@ -309,7 +326,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
if (startsWith(replica_key, "replica"))
{
replica_addresses.emplace_back(config, partial_prefix + replica_key);
replica_addresses.emplace_back(config, partial_prefix + replica_key, current_shard_num, current_replica_num);
++current_replica_num;
if (!replica_addresses.back().is_local)

View File

@ -57,6 +57,9 @@ public:
UInt16 port;
String user;
String password;
UInt32 shard_index{}; /// shard serial number in configuration file, starting from 1.
UInt32 replica_index{}; /// replica serial number in this shard, starting from 1; zero means no replicas.
/// This database is selected when no database is specified for Distributed table
String default_database;
/// The locality is determined at the initialization, and is not changed even if DNS is changed
@ -67,7 +70,7 @@ public:
Protocol::Secure secure = Protocol::Secure::Disable;
Address() = default;
Address(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
Address(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, UInt32 shard_index_ = 0, UInt32 replica_index_ = 0);
Address(const String & host_port_, const String & user_, const String & password_, UInt16 clickhouse_port, bool secure_ = false);
/// Returns 'escaped_host_name:port'
@ -80,8 +83,10 @@ public:
static std::pair<String, UInt16> fromString(const String & host_port_string);
/// Retrurns escaped user:password@resolved_host_address:resolved_host_port#default_database
/// Returns escaped shard{shard_index}_replica{replica_index}
String toFullString() const;
/// Returns address with only shard index and replica index or full address without shard index and replica index
static Address fromFullString(const String & address_full_string);
/// Returns resolved address if it does resolve.

View File

@ -57,6 +57,7 @@
#include <Common/TraceCollector.h>
#include <common/logger_useful.h>
#include <Common/RemoteHostFilter.h>
#include <ext/singleton.h>
namespace ProfileEvents
{
@ -168,7 +169,6 @@ struct ContextShared
RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml
std::unique_ptr<TraceCollector> trace_collector; /// Thread collecting traces from threads executing queries
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
class SessionKeyHash
@ -299,13 +299,7 @@ struct ContextShared
schedule_pool.reset();
ddl_worker.reset();
/// Stop trace collector if any
trace_collector.reset();
}
bool hasTraceCollector()
{
return trace_collector != nullptr;
ext::Singleton<TraceCollector>::reset();
}
void initializeTraceCollector(std::shared_ptr<TraceLog> trace_log)
@ -313,7 +307,7 @@ struct ContextShared
if (trace_log == nullptr)
return;
trace_collector = std::make_unique<TraceCollector>(trace_log);
ext::Singleton<TraceCollector>()->setTraceLog(trace_log);
}
};
@ -650,6 +644,10 @@ void Context::checkAccess(const AccessFlags & access, const std::string_view & d
void Context::checkAccess(const AccessRightsElement & access) const { return checkAccessImpl(access); }
void Context::checkAccess(const AccessRightsElements & access) const { return checkAccessImpl(access); }
void Context::switchRowPolicy()
{
row_policy = getAccessControlManager().getRowPolicyContext(client_info.initial_user);
}
void Context::setUsersConfig(const ConfigurationPtr & config)
{
@ -1689,11 +1687,6 @@ void Context::initializeSystemLogs()
shared->system_logs.emplace(*global_context, getConfigRef());
}
bool Context::hasTraceCollector()
{
return shared->hasTraceCollector();
}
void Context::initializeTraceCollector()
{
shared->initializeTraceCollector(getTraceLog());

View File

@ -253,6 +253,10 @@ public:
std::shared_ptr<QuotaContext> getQuota() const { return quota; }
std::shared_ptr<RowPolicyContext> getRowPolicy() const { return row_policy; }
/// TODO: we need much better code for switching policies, quotas, access rights for initial user
/// Switches row policy in case we have initial user in client info
void switchRowPolicy();
/** Take the list of users, quotas and configuration profiles from this config.
* The list of users is completely replaced.
* The accumulated quota values are not reset if the quota is not deleted.

View File

@ -70,9 +70,51 @@ using LogAST = DebugASTLog<false>; /// set to true to enable logs
namespace ErrorCodes
{
extern const int UNKNOWN_IDENTIFIER;
extern const int ILLEGAL_PREWHERE;
extern const int LOGICAL_ERROR;
}
namespace
{
/// Check if there is an ignore function. It's used for disabling constant folding in query
/// predicates because some performance tests use ignore function as a non-optimize guard.
bool allowEarlyConstantFolding(const ExpressionActions & actions, const Settings & settings)
{
if (!settings.enable_early_constant_folding)
return false;
for (auto & action : actions.getActions())
{
if (action.type == action.APPLY_FUNCTION && action.function_base)
{
auto name = action.function_base->getName();
if (name == "ignore")
return false;
}
}
return true;
}
}
bool sanitizeBlock(Block & block)
{
for (auto & col : block)
{
if (!col.column)
{
if (isNotCreatable(col.type->getTypeId()))
return false;
col.column = col.type->createColumn();
}
else if (isColumnConst(*col.column) && !col.column->empty())
col.column = col.column->cloneEmpty();
}
return true;
}
ExpressionAnalyzer::ExpressionAnalyzer(
const ASTPtr & query_,
const SyntaxAnalyzerResultPtr & syntax_analyzer_result_,
@ -733,7 +775,8 @@ void SelectQueryExpressionAnalyzer::appendSelect(ExpressionActionsChain & chain,
step.required_output.push_back(child->getColumnName());
}
bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, bool only_types, bool optimize_read_in_order)
bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, bool only_types, bool optimize_read_in_order,
ManyExpressionActions & order_by_elements_actions)
{
const auto * select_query = getSelectQuery();
@ -884,12 +927,239 @@ ExpressionActionsPtr ExpressionAnalyzer::getConstActions()
return actions;
}
void SelectQueryExpressionAnalyzer::getAggregateInfo(Names & key_names, AggregateDescriptions & aggregates) const
ExpressionActionsPtr SelectQueryExpressionAnalyzer::simpleSelectActions()
{
for (const auto & name_and_type : aggregation_keys)
key_names.emplace_back(name_and_type.name);
ExpressionActionsChain new_chain(context);
appendSelect(new_chain, false);
return new_chain.getLastActions();
}
aggregates = aggregate_descriptions;
ExpressionAnalysisResult::ExpressionAnalysisResult(
SelectQueryExpressionAnalyzer & query_analyzer,
bool first_stage_,
bool second_stage_,
bool only_types,
const FilterInfoPtr & filter_info_,
const Block & source_header)
: first_stage(first_stage_)
, second_stage(second_stage_)
, need_aggregate(query_analyzer.hasAggregation())
{
/// first_stage: Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
/// second_stage: Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
/** First we compose a chain of actions and remember the necessary steps from it.
* Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and
* throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries.
*/
const ASTSelectQuery & query = *query_analyzer.getSelectQuery();
const Context & context = query_analyzer.context;
const Settings & settings = context.getSettingsRef();
const StoragePtr & storage = query_analyzer.storage();
bool finalized = false;
size_t where_step_num = 0;
auto finalizeChain = [&](ExpressionActionsChain & chain)
{
if (!finalized)
{
chain.finalize();
finalize(chain, context, where_step_num);
chain.clear();
}
finalized = true;
};
{
ExpressionActionsChain chain(context);
Names additional_required_columns_after_prewhere;
if (storage && (query.sample_size() || settings.parallel_replicas_count > 1))
{
Names columns_for_sampling = storage->getColumnsRequiredForSampling();
additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(),
columns_for_sampling.begin(), columns_for_sampling.end());
}
if (storage && query.final())
{
Names columns_for_final = storage->getColumnsRequiredForFinal();
additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(),
columns_for_final.begin(), columns_for_final.end());
}
if (storage && filter_info_)
{
filter_info = filter_info_;
query_analyzer.appendPreliminaryFilter(chain, filter_info->actions, filter_info->column_name);
}
if (query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere))
{
prewhere_info = std::make_shared<PrewhereInfo>(
chain.steps.front().actions, query.prewhere()->getColumnName());
if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, settings))
{
Block before_prewhere_sample = source_header;
if (sanitizeBlock(before_prewhere_sample))
{
prewhere_info->prewhere_actions->execute(before_prewhere_sample);
auto & column_elem = before_prewhere_sample.getByName(query.prewhere()->getColumnName());
/// If the filter column is a constant, record it.
if (column_elem.column)
prewhere_constant_filter_description = ConstantFilterDescription(*column_elem.column);
}
}
chain.addStep();
}
query_analyzer.appendArrayJoin(chain, only_types || !first_stage);
if (query_analyzer.appendJoin(chain, only_types || !first_stage))
{
before_join = chain.getLastActions();
if (!hasJoin())
throw Exception("No expected JOIN", ErrorCodes::LOGICAL_ERROR);
chain.addStep();
}
if (query_analyzer.appendWhere(chain, only_types || !first_stage))
{
where_step_num = chain.steps.size() - 1;
before_where = chain.getLastActions();
if (allowEarlyConstantFolding(*before_where, settings))
{
Block before_where_sample;
if (chain.steps.size() > 1)
before_where_sample = chain.steps[chain.steps.size() - 2].actions->getSampleBlock();
else
before_where_sample = source_header;
if (sanitizeBlock(before_where_sample))
{
before_where->execute(before_where_sample);
auto & column_elem = before_where_sample.getByName(query.where()->getColumnName());
/// If the filter column is a constant, record it.
if (column_elem.column)
where_constant_filter_description = ConstantFilterDescription(*column_elem.column);
}
}
chain.addStep();
}
if (need_aggregate)
{
query_analyzer.appendGroupBy(chain, only_types || !first_stage);
query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !first_stage);
before_aggregation = chain.getLastActions();
finalizeChain(chain);
if (query_analyzer.appendHaving(chain, only_types || !second_stage))
{
before_having = chain.getLastActions();
chain.addStep();
}
}
bool has_stream_with_non_joned_rows = (before_join && before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows());
optimize_read_in_order =
settings.optimize_read_in_order
&& storage && query.orderBy()
&& !query_analyzer.hasAggregation()
&& !query.final()
&& !has_stream_with_non_joned_rows;
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage));
selected_columns = chain.getLastStep().required_output;
has_order_by = query_analyzer.appendOrderBy(chain, only_types || (need_aggregate ? !second_stage : !first_stage),
optimize_read_in_order, order_by_elements_actions);
before_order_and_select = chain.getLastActions();
chain.addStep();
if (query_analyzer.appendLimitBy(chain, only_types || !second_stage))
{
before_limit_by = chain.getLastActions();
chain.addStep();
}
query_analyzer.appendProjectResult(chain);
final_projection = chain.getLastActions();
finalizeChain(chain);
}
/// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys).
removeExtraColumns();
checkActions();
}
void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, const Context & context_, size_t where_step_num)
{
if (hasPrewhere())
{
const ExpressionActionsChain::Step & step = chain.steps.at(0);
prewhere_info->remove_prewhere_column = step.can_remove_required_output.at(0);
Names columns_to_remove;
for (size_t i = 1; i < step.required_output.size(); ++i)
{
if (step.can_remove_required_output[i])
columns_to_remove.push_back(step.required_output[i]);
}
if (!columns_to_remove.empty())
{
auto columns = prewhere_info->prewhere_actions->getSampleBlock().getNamesAndTypesList();
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(columns, context_);
for (const auto & column : columns_to_remove)
actions->add(ExpressionAction::removeColumn(column));
prewhere_info->remove_columns_actions = std::move(actions);
}
columns_to_remove_after_prewhere = std::move(columns_to_remove);
}
else if (hasFilter())
{
/// Can't have prewhere and filter set simultaneously
filter_info->do_remove_column = chain.steps.at(0).can_remove_required_output.at(0);
}
if (hasWhere())
remove_where_filter = chain.steps.at(where_step_num).can_remove_required_output.at(0);
}
void ExpressionAnalysisResult::removeExtraColumns()
{
if (hasFilter())
filter_info->actions->prependProjectInput();
if (hasWhere())
before_where->prependProjectInput();
if (hasHaving())
before_having->prependProjectInput();
}
void ExpressionAnalysisResult::checkActions()
{
/// Check that PREWHERE doesn't contain unusual actions. Unusual actions are that can change number of rows.
if (hasPrewhere())
{
auto check_actions = [](const ExpressionActionsPtr & actions)
{
if (actions)
for (const auto & action : actions->getActions())
if (action.type == ExpressionAction::Type::JOIN || action.type == ExpressionAction::Type::ARRAY_JOIN)
throw Exception("PREWHERE cannot contain ARRAY JOIN or JOIN action", ErrorCodes::ILLEGAL_PREWHERE);
};
check_actions(prewhere_info->prewhere_actions);
check_actions(prewhere_info->alias_actions);
check_actions(prewhere_info->remove_columns_actions);
}
}
}

Some files were not shown because too many files have changed in this diff Show More