Merge remote-tracking branch 'upstream/master' into update_libunwind

This commit is contained in:
Nikita Lapkov 2019-07-01 21:11:26 +00:00
commit 9641d058d0
247 changed files with 5117 additions and 2440 deletions

3
.gitmodules vendored
View File

@ -76,6 +76,9 @@
[submodule "contrib/brotli"]
path = contrib/brotli
url = https://github.com/google/brotli.git
[submodule "contrib/h3"]
path = contrib/h3
url = https://github.com/uber/h3
[submodule "contrib/hyperscan"]
path = contrib/hyperscan
url = https://github.com/ClickHouse-Extras/hyperscan.git

View File

@ -297,6 +297,24 @@ if (USE_INCLUDE_WHAT_YOU_USE)
endif()
endif ()
# Using clang-tidy static analyzer http://mariobadr.com/using-clang-tidy-with-cmake-36.html https://cmake.org/cmake/help/v3.6/prop_tgt/LANG_CLANG_TIDY.html
option (ENABLE_CLANG_TIDY "Use 'clang-tidy' static analyzer" OFF)
if (ENABLE_CLANG_TIDY)
if (${CMAKE_VERSION} VERSION_LESS "3.6.0")
message(FATAL_ERROR "clang-tidy requires CMake version at least 3.6.")
endif()
find_program (CLANG_TIDY_EXE NAMES "clang-tidy" DOC "Path to clang-tidy executable")
if (NOT CLANG_TIDY_EXE)
set (USE_CLANG_TIDY 0)
message (STATUS "clang-tidy not found.")
else ()
set (USE_CLANG_TIDY 1)
message (STATUS "clang-tidy found: ${CLANG_TIDY_EXE}")
set (DO_CLANG_TIDY "${CLANG_TIDY_EXE}" "-checks=*,-clang-analyzer-alpha.*")
# You can enable it within a directory by: set (CMAKE_CXX_CLANG_TIDY "${DO_CLANG_TIDY}")
endif ()
endif ()
if (ENABLE_TESTS)
message (STATUS "Tests are enabled")
endif ()
@ -347,6 +365,7 @@ include (cmake/find_libgsasl.cmake)
include (cmake/find_rdkafka.cmake)
include (cmake/find_capnp.cmake)
include (cmake/find_llvm.cmake)
include (cmake/find_h3.cmake)
include (cmake/find_cpuid.cmake) # Freebsd, bundled
if (NOT USE_CPUID)
include (cmake/find_cpuinfo.cmake) # Debian
@ -427,6 +446,7 @@ if (GLIBC_COMPATIBILITY OR USE_INTERNAL_UNWIND_LIBRARY_FOR_EXCEPTION_HANDLING)
add_default_dependencies(kj)
add_default_dependencies(simdjson)
add_default_dependencies(apple_rt)
add_default_dependencies(h3)
add_default_dependencies(re2)
add_default_dependencies(re2_st)
add_default_dependencies(hs_compile_shared)

View File

@ -7,9 +7,9 @@ WHAT=$1
[[ $EUID -ne 0 ]] && SUDO=sudo
command -v apt-get && PACKAGE_MANAGER=apt
command -v yum && PACKAGE_MANAGER=yum
command -v pkg && PACKAGE_MANAGER=pkg
command -v apt-get && PACKAGE_MANAGER=apt
case $PACKAGE_MANAGER in

19
cmake/find_h3.cmake Normal file
View File

@ -0,0 +1,19 @@
option (USE_INTERNAL_H3_LIBRARY "Set to FALSE to use system h3 library instead of bundled" ${NOT_UNBUNDLED})
set (H3_INCLUDE_PATHS /usr/local/include/h3)
if (USE_INTERNAL_H3_LIBRARY)
set (H3_LIBRARY h3)
set (H3_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/h3/src/h3lib/include)
else ()
find_library (H3_LIBRARY h3)
find_path (H3_INCLUDE_DIR NAMES h3api.h PATHS ${H3_INCLUDE_PATHS})
endif ()
if (H3_LIBRARY AND H3_INCLUDE_DIR)
set (USE_H3 1)
else ()
set (USE_H3 0)
endif ()
message (STATUS "Using h3=${USE_H3}: ${H3_INCLUDE_DIR} : ${H3_LIBRARY}")

View File

@ -106,6 +106,10 @@ if (USE_INTERNAL_CPUID_LIBRARY)
add_subdirectory (libcpuid)
endif ()
if (USE_INTERNAL_H3_LIBRARY)
add_subdirectory(h3-cmake)
endif ()
if (USE_INTERNAL_SSL_LIBRARY)
if (NOT MAKE_STATIC_LIBRARIES)
set (BUILD_SHARED 1)

1
contrib/h3 vendored Submodule

@ -0,0 +1 @@
Subproject commit 6cfd649e8c0d3ed913e8aae928a669fc3b8a2365

View File

@ -0,0 +1,27 @@
set(H3_SOURCE_DIR ${ClickHouse_SOURCE_DIR}/contrib/h3/src/h3lib)
set(H3_BINARY_DIR ${ClickHouse_BINARY_DIR}/contrib/h3/src/h3lib)
set(SRCS
${H3_SOURCE_DIR}/lib/algos.c
${H3_SOURCE_DIR}/lib/baseCells.c
${H3_SOURCE_DIR}/lib/bbox.c
${H3_SOURCE_DIR}/lib/coordijk.c
${H3_SOURCE_DIR}/lib/faceijk.c
${H3_SOURCE_DIR}/lib/geoCoord.c
${H3_SOURCE_DIR}/lib/h3Index.c
${H3_SOURCE_DIR}/lib/h3UniEdge.c
${H3_SOURCE_DIR}/lib/linkedGeo.c
${H3_SOURCE_DIR}/lib/localij.c
${H3_SOURCE_DIR}/lib/mathExtensions.c
${H3_SOURCE_DIR}/lib/polygon.c
${H3_SOURCE_DIR}/lib/vec2d.c
${H3_SOURCE_DIR}/lib/vec3d.c
${H3_SOURCE_DIR}/lib/vertexGraph.c
)
configure_file(${H3_SOURCE_DIR}/include/h3api.h.in ${H3_BINARY_DIR}/include/h3api.h)
add_library(h3 ${SRCS})
target_include_directories(h3 SYSTEM PUBLIC ${H3_SOURCE_DIR}/include)
target_include_directories(h3 SYSTEM PUBLIC ${H3_BINARY_DIR}/include)
target_compile_definitions(h3 PRIVATE H3_HAVE_VLA)

View File

@ -15,8 +15,13 @@ IF(CMAKE_COMPILER_IS_GNUCXX)
STRING(REGEX MATCHALL "[0-9]+" GCC_COMPILER_VERSION ${GCC_COMPILER_VERSION})
LIST(LENGTH GCC_COMPILER_VERSION GCC_COMPILER_VERSION_LENGTH)
LIST(GET GCC_COMPILER_VERSION 0 GCC_COMPILER_VERSION_MAJOR)
LIST(GET GCC_COMPILER_VERSION 0 GCC_COMPILER_VERSION_MINOR)
if (GCC_COMPILER_VERSION_LENGTH GREATER 1)
LIST(GET GCC_COMPILER_VERSION 1 GCC_COMPILER_VERSION_MINOR)
else ()
set (GCC_COMPILER_VERSION_MINOR 0)
endif ()
SET(GCC_COMPILER_VERSION_MAJOR ${GCC_COMPILER_VERSION_MAJOR} CACHE INTERNAL "gcc major version")
SET(GCC_COMPILER_VERSION_MINOR ${GCC_COMPILER_VERSION_MINOR} CACHE INTERNAL "gcc minor version")

View File

@ -2,6 +2,10 @@ if (USE_INCLUDE_WHAT_YOU_USE)
set (CMAKE_CXX_INCLUDE_WHAT_YOU_USE ${IWYU_PATH})
endif ()
if (USE_CLANG_TIDY)
set (CMAKE_CXX_CLANG_TIDY "${DO_CLANG_TIDY}")
endif ()
if(COMPILER_PIPE)
set(MAX_COMPILER_MEMORY 2500)
else()

View File

@ -59,6 +59,7 @@
#include <Parsers/parseQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Client/Connection.h>
#include <Common/InterruptListener.h>
#include <Functions/registerFunctions.h>
@ -202,6 +203,9 @@ private:
/// External tables info.
std::list<ExternalTable> external_tables;
/// Dictionary with query parameters for prepared statements.
NameToNameMap query_parameters;
ConnectionParameters connection_parameters;
void initialize(Poco::Util::Application & self)
@ -795,7 +799,6 @@ private:
/// Some parts of a query (result output and formatting) are executed client-side.
/// Thus we need to parse the query.
parsed_query = parsed_query_;
if (!parsed_query)
{
const char * begin = query.data();
@ -900,6 +903,16 @@ private:
/// Process the query that doesn't require transferring data blocks to the server.
void processOrdinaryQuery()
{
/// We will always rewrite query (even if there are no query_parameters) because it will help to find errors in query formatter.
{
/// Replace ASTQueryParameter with ASTLiteral for prepared statements.
ReplaceQueryParameterVisitor visitor(query_parameters);
visitor.visit(parsed_query);
/// Get new query after substitutions. Note that it cannot be done for INSERT query with embedded data.
query = serializeAST(*parsed_query);
}
connection->sendQuery(connection_parameters.timeouts, query, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
sendExternalTables();
receiveResult();
@ -1548,7 +1561,8 @@ public:
/** We allow different groups of arguments:
* - common arguments;
* - arguments for any number of external tables each in form "--external args...",
* where possible args are file, name, format, structure, types.
* where possible args are file, name, format, structure, types;
* - param arguments for prepared statements.
* Split these groups before processing.
*/
using Arguments = std::vector<std::string>;
@ -1597,6 +1611,30 @@ public:
else
{
in_external_group = false;
/// Parameter arg after underline.
if (startsWith(arg, "--param_"))
{
const char * param_continuation = arg + strlen("--param_");
const char * equal_pos = strchr(param_continuation, '=');
if (equal_pos == param_continuation)
throw Exception("Parameter name cannot be empty", ErrorCodes::BAD_ARGUMENTS);
if (equal_pos)
{
/// param_name=value
query_parameters.emplace(String(param_continuation, equal_pos), String(equal_pos + 1));
}
else
{
/// param_name value
++arg_num;
arg = argv[arg_num];
query_parameters.emplace(String(param_continuation), String(arg));
}
}
else
common_arguments.emplace_back(arg);
}
}
@ -1672,6 +1710,7 @@ public:
("structure", po::value<std::string>(), "structure")
("types", po::value<std::string>(), "types")
;
/// Parse main commandline options.
po::parsed_options parsed = po::command_line_parser(common_arguments).options(main_description).run();
po::variables_map options;
@ -1696,6 +1735,7 @@ public:
{
std::cout << main_description << "\n";
std::cout << external_description << "\n";
std::cout << "In addition, --param_name=value can be specified for substitution of parameters for parametrized queries.\n";
exit(0);
}

View File

@ -29,6 +29,11 @@
//#include "includes.h"
#include "config_client.h"
// Should not be included on BSD systems, but if it happen...
#ifdef HAVE_READPASSPHRASE
# include_next <readpassphrase.h>
#endif
#ifndef HAVE_READPASSPHRASE
# ifdef __cplusplus

View File

@ -475,9 +475,9 @@ void HTTPHandler::processQuery(
settings.readonly = 2;
}
bool isExternalData = startsWith(request.getContentType().data(), "multipart/form-data");
bool has_external_data = startsWith(request.getContentType().data(), "multipart/form-data");
if (isExternalData)
if (has_external_data)
{
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
reserved_param_suffixes.reserve(3);
@ -501,6 +501,12 @@ void HTTPHandler::processQuery(
else if (param_could_be_skipped(key))
{
}
else if (startsWith(key, "param_"))
{
/// Save name and values of substitution in dictionary.
const String parameter_name = key.substr(strlen("param_"));
context.setQueryParameter(parameter_name, value);
}
else
{
/// All other query parameters are treated as settings.
@ -516,7 +522,7 @@ void HTTPHandler::processQuery(
std::string full_query;
/// Support for "external data for query processing".
if (isExternalData)
if (has_external_data)
{
ExternalTablesHandler handler(context, params);
params.load(request, istr, handler);

View File

@ -47,8 +47,7 @@ struct AggregateFunctionSequenceMatchData final
using Comparator = ComparePairFirst<std::less>;
bool sorted = true;
static constexpr size_t bytes_in_arena = 64;
PODArray<TimestampEvents, bytes_in_arena, AllocatorWithStackMemory<Allocator<false>, bytes_in_arena>> events_list;
PODArrayWithStackMemory<TimestampEvents, 64> events_list;
void add(const Timestamp timestamp, const Events & events)
{
@ -203,8 +202,7 @@ private:
PatternAction(const PatternActionType type, const std::uint64_t extra = 0) : type{type}, extra{extra} {}
};
static constexpr size_t bytes_on_stack = 64;
using PatternActions = PODArray<PatternAction, bytes_on_stack, AllocatorWithStackMemory<Allocator<false>, bytes_on_stack>>;
using PatternActions = PODArrayWithStackMemory<PatternAction, 64>;
Derived & derived() { return static_cast<Derived &>(*this); }

View File

@ -68,9 +68,8 @@ struct AggregateFunctionTimeSeriesGroupSumData
}
};
static constexpr size_t bytes_on_stack = 128;
typedef std::map<UInt64, Points> Series;
typedef PODArray<DataPoint, bytes_on_stack, AllocatorWithStackMemory<Allocator<false>, bytes_on_stack>> AggSeries;
typedef PODArrayWithStackMemory<DataPoint, 128> AggSeries;
Series ss;
AggSeries result;

View File

@ -35,10 +35,7 @@ template <typename T>
struct AggregateFunctionWindowFunnelData
{
using TimestampEvent = std::pair<T, UInt8>;
static constexpr size_t bytes_on_stack = 64;
using TimestampEvents = PODArray<TimestampEvent, bytes_on_stack, AllocatorWithStackMemory<Allocator<false>, bytes_on_stack>>;
using TimestampEvents = PODArray<TimestampEvent, 64>;
using Comparator = ComparePairFirst;
bool sorted = true;

View File

@ -27,8 +27,7 @@ struct QuantileExact
{
/// The memory will be allocated to several elements at once, so that the state occupies 64 bytes.
static constexpr size_t bytes_in_arena = 64 - sizeof(PODArray<Value>);
using Array = PODArray<Value, bytes_in_arena, AllocatorWithStackMemory<Allocator<false>, bytes_in_arena>>;
using Array = PODArrayWithStackMemory<Value, bytes_in_arena>;
Array array;
void add(const Value & x)

View File

@ -86,8 +86,7 @@ class QuantileTDigest
/// The memory will be allocated to several elements at once, so that the state occupies 64 bytes.
static constexpr size_t bytes_in_arena = 128 - sizeof(PODArray<Centroid>) - sizeof(Count) - sizeof(UInt32);
using Summary = PODArray<Centroid, bytes_in_arena / sizeof(Centroid), AllocatorWithStackMemory<Allocator<false>, bytes_in_arena>>;
using Summary = PODArrayWithStackMemory<Centroid, bytes_in_arena>;
Summary summary;
Count count = 0;

View File

@ -194,8 +194,7 @@ private:
friend void rs_perf_test();
/// We allocate a little memory on the stack - to avoid allocations when there are many objects with a small number of elements.
static constexpr size_t bytes_on_stack = 64;
using Array = DB::PODArray<T, bytes_on_stack / sizeof(T), AllocatorWithStackMemory<Allocator<false>, bytes_on_stack>>;
using Array = DB::PODArrayWithStackMemory<T, 64>;
size_t sample_count;
size_t total_values = 0;

View File

@ -164,9 +164,8 @@ public:
private:
/// We allocate some memory on the stack to avoid allocations when there are many objects with a small number of elements.
static constexpr size_t bytes_on_stack = 64;
using Element = std::pair<T, UInt32>;
using Array = DB::PODArray<Element, bytes_on_stack / sizeof(Element), AllocatorWithStackMemory<Allocator<false>, bytes_on_stack>>;
using Array = DB::PODArray<Element, 64>;
size_t sample_count;
size_t total_values{};

View File

@ -99,6 +99,11 @@ public:
return data->getBool(0);
}
Float64 getFloat64(size_t) const override
{
return data->getFloat64(0);
}
bool isNullAt(size_t) const override
{
return data->isNullAt(0);

View File

@ -57,6 +57,8 @@ public:
UInt64 get64(size_t n) const override { return getDictionary().get64(getIndexes().getUInt(n)); }
UInt64 getUInt(size_t n) const override { return getDictionary().getUInt(getIndexes().getUInt(n)); }
Int64 getInt(size_t n) const override { return getDictionary().getInt(getIndexes().getUInt(n)); }
Float64 getFloat64(size_t n) const override { return getDictionary().getInt(getIndexes().getFloat64(n)); }
bool getBool(size_t n) const override { return getDictionary().getInt(getIndexes().getBool(n)); }
bool isNullAt(size_t n) const override { return getDictionary().isNullAt(getIndexes().getUInt(n)); }
ColumnPtr cut(size_t start, size_t length) const override
{

View File

@ -64,6 +64,8 @@ public:
UInt64 get64(size_t n) const override { return getNestedColumn()->get64(n); }
UInt64 getUInt(size_t n) const override { return getNestedColumn()->getUInt(n); }
Int64 getInt(size_t n) const override { return getNestedColumn()->getInt(n); }
Float64 getFloat64(size_t n) const override { return getNestedColumn()->getFloat64(n); }
bool getBool(size_t n) const override { return getNestedColumn()->getBool(n); }
bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
void updateHashWithValue(size_t n, SipHash & hash_func) const override

View File

@ -33,7 +33,7 @@ template <typename T>
StringRef ColumnVector<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
auto pos = arena.allocContinue(sizeof(T), begin);
unalignedStore(pos, data[n]);
unalignedStore<T>(pos, data[n]);
return StringRef(pos, sizeof(T));
}

View File

@ -430,6 +430,8 @@ namespace ErrorCodes
extern const int MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES = 453;
extern const int OPENSSL_ERROR = 454;
extern const int SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY = 455;
extern const int UNKNOWN_QUERY_PARAMETER = 456;
extern const int BAD_QUERY_PARAMETER = 457;
extern const int CANNOT_UNLINK = 458;
extern const int KEEPER_EXCEPTION = 999;

View File

@ -0,0 +1,43 @@
#include <Common/config.h>
#if USE_MIMALLOC
#include "MiAllocator.h"
#include <mimalloc.h>
namespace DB
{
void * MiAllocator::alloc(size_t size, size_t alignment)
{
if (alignment == 0)
return mi_malloc(size);
else
return mi_malloc_aligned(size, alignment);
}
void MiAllocator::free(void * buf, size_t)
{
mi_free(buf);
}
void * MiAllocator::realloc(void * old_ptr, size_t, size_t new_size, size_t alignment)
{
if (old_ptr == nullptr)
return alloc(new_size, alignment);
if (new_size == 0)
{
mi_free(old_ptr);
return nullptr;
}
if (alignment == 0)
return mi_realloc(old_ptr, alignment);
return mi_realloc_aligned(old_ptr, new_size, alignment);
}
}
#endif

View File

@ -6,7 +6,6 @@
#error "do not include this file until USE_MIMALLOC is set to 1"
#endif
#include <mimalloc.h>
#include <cstddef>
namespace DB
@ -19,37 +18,11 @@ namespace DB
*/
struct MiAllocator
{
static void * alloc(size_t size, size_t alignment = 0);
static void * alloc(size_t size, size_t alignment = 0)
{
if (alignment == 0)
return mi_malloc(size);
else
return mi_malloc_aligned(size, alignment);
}
static void free(void * buf, size_t)
{
mi_free(buf);
}
static void * realloc(void * old_ptr, size_t, size_t new_size, size_t alignment = 0)
{
if (old_ptr == nullptr)
return alloc(new_size, alignment);
if (new_size == 0)
{
mi_free(old_ptr);
return nullptr;
}
if (alignment == 0)
return mi_realloc(old_ptr, alignment);
return mi_realloc_aligned(old_ptr, new_size, alignment);
}
static void free(void * buf, size_t);
static void * realloc(void * old_ptr, size_t, size_t new_size, size_t alignment = 0);
};
}

View File

@ -45,7 +45,7 @@ inline constexpr size_t integerRoundUp(size_t value, size_t dividend)
* Only part of the std::vector interface is supported.
*
* The default constructor creates an empty object that does not allocate memory.
* Then the memory is allocated at least INITIAL_SIZE bytes.
* Then the memory is allocated at least initial_bytes bytes.
*
* If you insert elements with push_back, without making a `reserve`, then PODArray is about 2.5 times faster than std::vector.
*
@ -74,7 +74,7 @@ extern const char EmptyPODArray[EmptyPODArraySize];
/** Base class that depend only on size of element, not on element itself.
* You can static_cast to this class if you want to insert some data regardless to the actual type T.
*/
template <size_t ELEMENT_SIZE, size_t INITIAL_SIZE, typename TAllocator, size_t pad_right_, size_t pad_left_>
template <size_t ELEMENT_SIZE, size_t initial_bytes, typename TAllocator, size_t pad_right_, size_t pad_left_>
class PODArrayBase : private boost::noncopyable, private TAllocator /// empty base optimization
{
protected:
@ -161,7 +161,8 @@ protected:
{
// The allocated memory should be multiplication of ELEMENT_SIZE to hold the element, otherwise,
// memory issue such as corruption could appear in edge case.
realloc(std::max(((INITIAL_SIZE - 1) / ELEMENT_SIZE + 1) * ELEMENT_SIZE, minimum_memory_for_elements(1)),
realloc(std::max(integerRoundUp(initial_bytes, ELEMENT_SIZE),
minimum_memory_for_elements(1)),
std::forward<TAllocatorParams>(allocator_params)...);
}
else
@ -257,11 +258,11 @@ public:
}
};
template <typename T, size_t INITIAL_SIZE = 4096, typename TAllocator = Allocator<false>, size_t pad_right_ = 0, size_t pad_left_ = 0>
class PODArray : public PODArrayBase<sizeof(T), INITIAL_SIZE, TAllocator, pad_right_, pad_left_>
template <typename T, size_t initial_bytes = 4096, typename TAllocator = Allocator<false>, size_t pad_right_ = 0, size_t pad_left_ = 0>
class PODArray : public PODArrayBase<sizeof(T), initial_bytes, TAllocator, pad_right_, pad_left_>
{
protected:
using Base = PODArrayBase<sizeof(T), INITIAL_SIZE, TAllocator, pad_right_, pad_left_>;
using Base = PODArrayBase<sizeof(T), initial_bytes, TAllocator, pad_right_, pad_left_>;
T * t_start() { return reinterpret_cast<T *>(this->c_start); }
T * t_end() { return reinterpret_cast<T *>(this->c_end); }
@ -618,17 +619,23 @@ public:
}
};
template <typename T, size_t INITIAL_SIZE, typename TAllocator, size_t pad_right_>
void swap(PODArray<T, INITIAL_SIZE, TAllocator, pad_right_> & lhs, PODArray<T, INITIAL_SIZE, TAllocator, pad_right_> & rhs)
template <typename T, size_t initial_bytes, typename TAllocator, size_t pad_right_>
void swap(PODArray<T, initial_bytes, TAllocator, pad_right_> & lhs, PODArray<T, initial_bytes, TAllocator, pad_right_> & rhs)
{
lhs.swap(rhs);
}
/** For columns. Padding is enough to read and write xmm-register at the address of the last element. */
template <typename T, size_t INITIAL_SIZE = 4096, typename TAllocator = Allocator<false>>
using PaddedPODArray = PODArray<T, INITIAL_SIZE, TAllocator, 15, 16>;
template <typename T, size_t initial_bytes = 4096, typename TAllocator = Allocator<false>>
using PaddedPODArray = PODArray<T, initial_bytes, TAllocator, 15, 16>;
template <typename T, size_t stack_size_in_bytes>
using PODArrayWithStackMemory = PODArray<T, 0, AllocatorWithStackMemory<Allocator<false>, integerRoundUp(stack_size_in_bytes, sizeof(T))>>;
/** A helper for declaring PODArray that uses inline memory.
* The initial size is set to use all the inline bytes, since using less would
* only add some extra allocation calls.
*/
template <typename T, size_t inline_bytes,
size_t rounded_bytes = integerRoundUp(inline_bytes, sizeof(T))>
using PODArrayWithStackMemory = PODArray<T, rounded_bytes,
AllocatorWithStackMemory<Allocator<false>, rounded_bytes>>;
}

View File

@ -30,10 +30,18 @@ template <typename Thread>
template <typename ReturnType>
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds)
{
auto on_error = []
auto on_error = [&]
{
if constexpr (std::is_same_v<ReturnType, void>)
{
if (first_exception)
{
std::exception_ptr exception;
std::swap(exception, first_exception);
std::rethrow_exception(exception);
}
throw DB::Exception("Cannot schedule a task", DB::ErrorCodes::CANNOT_SCHEDULE_TASK);
}
else
return false;
};

View File

@ -41,9 +41,6 @@ target_link_libraries (compact_array PRIVATE clickhouse_common_io ${Boost_FILESY
add_executable (radix_sort radix_sort.cpp)
target_link_libraries (radix_sort PRIVATE clickhouse_common_io)
add_executable (shell_command_test shell_command_test.cpp)
target_link_libraries (shell_command_test PRIVATE clickhouse_common_io)
add_executable (arena_with_free_lists arena_with_free_lists.cpp)
target_link_libraries (arena_with_free_lists PRIVATE clickhouse_compression clickhouse_common_io)
@ -53,15 +50,6 @@ target_link_libraries (pod_array PRIVATE clickhouse_common_io)
add_executable (thread_creation_latency thread_creation_latency.cpp)
target_link_libraries (thread_creation_latency PRIVATE clickhouse_common_io)
add_executable (thread_pool thread_pool.cpp)
target_link_libraries (thread_pool PRIVATE clickhouse_common_io)
add_executable (thread_pool_2 thread_pool_2.cpp)
target_link_libraries (thread_pool_2 PRIVATE clickhouse_common_io)
add_executable (thread_pool_3 thread_pool_3.cpp)
target_link_libraries (thread_pool_3 PRIVATE clickhouse_common_io)
add_executable (multi_version multi_version.cpp)
target_link_libraries (multi_version PRIVATE clickhouse_common_io)
add_check(multi_version)

View File

@ -0,0 +1,72 @@
#include <iostream>
#include <Core/Types.h>
#include <Common/ShellCommand.h>
#include <IO/copyData.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <chrono>
#include <thread>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
using namespace DB;
TEST(ShellCommand, Execute)
{
auto command = ShellCommand::execute("echo 'Hello, world!'");
std::string res;
readStringUntilEOF(res, command->out);
command->wait();
EXPECT_EQ(res, "Hello, world!\n");
}
TEST(ShellCommand, ExecuteDirect)
{
auto command = ShellCommand::executeDirect("/bin/echo", {"Hello, world!"});
std::string res;
readStringUntilEOF(res, command->out);
command->wait();
EXPECT_EQ(res, "Hello, world!\n");
}
TEST(ShellCommand, ExecuteWithInput)
{
auto command = ShellCommand::execute("cat");
String in_str = "Hello, world!\n";
ReadBufferFromString in(in_str);
copyData(in, command->in);
command->in.close();
std::string res;
readStringUntilEOF(res, command->out);
command->wait();
EXPECT_EQ(res, "Hello, world!\n");
}
TEST(ShellCommand, AutoWait)
{
// <defunct> hunting:
for (int i = 0; i < 1000; ++i)
{
auto command = ShellCommand::execute("echo " + std::to_string(i));
//command->wait(); // now automatic
}
// std::cerr << "inspect me: ps auxwwf" << "\n";
// std::this_thread::sleep_for(std::chrono::seconds(100));
}

View File

@ -1,11 +1,18 @@
#include <Common/ThreadPool.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
/** Reproduces bug in ThreadPool.
* It get stuck if we call 'wait' many times from many other threads simultaneously.
*/
int main(int, char **)
TEST(ThreadPool, ConcurrentWait)
{
auto worker = []
{
@ -29,6 +36,4 @@ int main(int, char **)
waiting_pool.schedule([&pool]{ pool.wait(); });
waiting_pool.wait();
return 0;
}

View File

@ -0,0 +1,32 @@
#include <atomic>
#include <iostream>
#include <Common/ThreadPool.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
/// Test for thread self-removal when number of free threads in pool is too large.
/// Just checks that nothing weird happens.
template <typename Pool>
int test()
{
Pool pool(10, 2, 10);
std::atomic<int> counter{0};
for (size_t i = 0; i < 10; ++i)
pool.schedule([&]{ ++counter; });
pool.wait();
return counter;
}
TEST(ThreadPool, ThreadRemoval)
{
EXPECT_EQ(test<FreeThreadPool>(), 10);
EXPECT_EQ(test<ThreadPool>(), 10);
}

View File

@ -2,10 +2,17 @@
#include <iostream>
#include <Common/ThreadPool.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
int main(int, char **)
TEST(ThreadPool, Loop)
{
std::atomic<size_t> res{0};
std::atomic<int> res{0};
for (size_t i = 0; i < 1000; ++i)
{
@ -16,6 +23,5 @@ int main(int, char **)
pool.wait();
}
std::cerr << res << "\n";
return 0;
EXPECT_EQ(res, 16000);
}

View File

@ -0,0 +1,38 @@
#include <iostream>
#include <stdexcept>
#include <Common/ThreadPool.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
bool check()
{
ThreadPool pool(10);
pool.schedule([]{ throw std::runtime_error("Hello, world!"); });
try
{
for (size_t i = 0; i < 100; ++i)
pool.schedule([]{}); /// An exception will be rethrown from this method.
}
catch (const std::runtime_error &)
{
return true;
}
pool.wait();
return false;
}
TEST(ThreadPool, ExceptionFromSchedule)
{
EXPECT_TRUE(check());
}

View File

@ -1,63 +0,0 @@
#include <iostream>
#include <Core/Types.h>
#include <Common/ShellCommand.h>
#include <IO/copyData.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadBufferFromString.h>
#include <chrono>
#include <thread>
using namespace DB;
int main(int, char **)
try
{
{
auto command = ShellCommand::execute("echo 'Hello, world!'");
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
copyData(command->out, out);
command->wait();
}
{
auto command = ShellCommand::executeDirect("/bin/echo", {"Hello, world!"});
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
copyData(command->out, out);
command->wait();
}
{
auto command = ShellCommand::execute("cat");
String in_str = "Hello, world!\n";
ReadBufferFromString in(in_str);
copyData(in, command->in);
command->in.close();
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
copyData(command->out, out);
command->wait();
}
// <defunct> hunting:
for (int i = 0; i < 1000; ++i)
{
auto command = ShellCommand::execute("echo " + std::to_string(i));
//command->wait(); // now automatic
}
// std::cerr << "inspect me: ps auxwwf" << "\n";
// std::this_thread::sleep_for(std::chrono::seconds(100));
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(false) << "\n";
return 1;
}

View File

@ -1,27 +0,0 @@
#include <mutex>
#include <iostream>
#include <Common/ThreadPool.h>
/// Test for thread self-removal when number of free threads in pool is too large.
/// Just checks that nothing weird happens.
template <typename Pool>
void test()
{
Pool pool(10, 2, 10);
std::mutex mutex;
for (size_t i = 0; i < 10; ++i)
pool.schedule([&]{ std::lock_guard lock(mutex); std::cerr << '.'; });
pool.wait();
}
int main(int, char **)
{
test<FreeThreadPool>();
std::cerr << '\n';
test<ThreadPool>();
std::cerr << '\n';
return 0;
}

View File

@ -67,7 +67,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest)
while (source < source_end)
{
accumulator += unalignedLoad<T>(source);
unalignedStore(dest, accumulator);
unalignedStore<T>(dest, accumulator);
source += sizeof(T);
dest += sizeof(T);

View File

@ -90,7 +90,7 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest)
const char * source_end = source + source_size;
const UInt32 items_count = source_size / sizeof(T);
unalignedStore(dest, items_count);
unalignedStore<UInt32>(dest, items_count);
dest += sizeof(items_count);
T prev_value{};
@ -99,7 +99,7 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest)
if (source < source_end)
{
prev_value = unalignedLoad<T>(source);
unalignedStore(dest, prev_value);
unalignedStore<T>(dest, prev_value);
source += sizeof(prev_value);
dest += sizeof(prev_value);
@ -109,7 +109,7 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest)
{
const T curr_value = unalignedLoad<T>(source);
prev_delta = static_cast<DeltaType>(curr_value - prev_value);
unalignedStore(dest, prev_delta);
unalignedStore<T>(dest, prev_delta);
source += sizeof(curr_value);
dest += sizeof(prev_delta);
@ -164,7 +164,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest)
if (source < source_end)
{
prev_value = unalignedLoad<T>(source);
unalignedStore(dest, prev_value);
unalignedStore<T>(dest, prev_value);
source += sizeof(prev_value);
dest += sizeof(prev_value);
@ -174,7 +174,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest)
{
prev_delta = unalignedLoad<DeltaType>(source);
prev_value = static_cast<T>(prev_value + prev_delta);
unalignedStore(dest, prev_value);
unalignedStore<T>(dest, prev_value);
source += sizeof(prev_delta);
dest += sizeof(prev_value);
@ -209,7 +209,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest)
// else if first bit is zero, no need to read more data.
const T curr_value = static_cast<T>(prev_value + prev_delta + double_delta);
unalignedStore(dest, curr_value);
unalignedStore<T>(dest, curr_value);
dest += sizeof(curr_value);
prev_delta = curr_value - prev_value;

View File

@ -94,7 +94,7 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest)
const UInt32 items_count = source_size / sizeof(T);
unalignedStore(dest, items_count);
unalignedStore<UInt32>(dest, items_count);
dest += sizeof(items_count);
T prev_value{};
@ -104,7 +104,7 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest)
if (source < source_end)
{
prev_value = unalignedLoad<T>(source);
unalignedStore(dest, prev_value);
unalignedStore<T>(dest, prev_value);
source += sizeof(prev_value);
dest += sizeof(prev_value);
@ -166,7 +166,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest)
if (source < source_end)
{
prev_value = unalignedLoad<T>(source);
unalignedStore(dest, prev_value);
unalignedStore<T>(dest, prev_value);
source += sizeof(prev_value);
dest += sizeof(prev_value);
@ -210,7 +210,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest)
}
// else: 0b0 prefix - use prev_value
unalignedStore(dest, curr_value);
unalignedStore<T>(dest, curr_value);
dest += sizeof(curr_value);
prev_xored_info = curr_xored_info;

View File

@ -390,7 +390,7 @@ void decompressData(const char * src, UInt32 bytes_size, char * dst, UInt32 unco
{
_T min_value = min;
for (UInt32 i = 0; i < num_elements; ++i, dst += sizeof(_T))
unalignedStore(dst, min_value);
unalignedStore<_T>(dst, min_value);
return;
}

View File

@ -200,7 +200,7 @@ inline void copyOverlap8Shuffle(UInt8 * op, const UInt8 *& match, const size_t o
0, 1, 2, 3, 4, 5, 6, 0,
};
unalignedStore(op, vtbl1_u8(unalignedLoad<uint8x8_t>(match), unalignedLoad<uint8x8_t>(masks + 8 * offset)));
unalignedStore<uint8x8_t>(op, vtbl1_u8(unalignedLoad<uint8x8_t>(match), unalignedLoad<uint8x8_t>(masks + 8 * offset)));
match += masks[offset];
}
@ -328,10 +328,10 @@ inline void copyOverlap16Shuffle(UInt8 * op, const UInt8 *& match, const size_t
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 0,
};
unalignedStore(op,
unalignedStore<uint8x8_t>(op,
vtbl2_u8(unalignedLoad<uint8x8x2_t>(match), unalignedLoad<uint8x8_t>(masks + 16 * offset)));
unalignedStore(op + 8,
unalignedStore<uint8x8_t>(op + 8,
vtbl2_u8(unalignedLoad<uint8x8x2_t>(match), unalignedLoad<uint8x8_t>(masks + 16 * offset + 8)));
match += masks[offset];

View File

@ -336,6 +336,7 @@ MutableColumns Block::mutateColumns()
void Block::setColumns(MutableColumns && columns)
{
/// TODO: assert if |columns| doesn't match |data|!
size_t num_columns = data.size();
for (size_t i = 0; i < num_columns; ++i)
data[i].column = std::move(columns[i]);
@ -344,6 +345,7 @@ void Block::setColumns(MutableColumns && columns)
void Block::setColumns(const Columns & columns)
{
/// TODO: assert if |columns| doesn't match |data|!
size_t num_columns = data.size();
for (size_t i = 0; i < num_columns; ++i)
data[i].column = columns[i];

View File

@ -88,7 +88,7 @@
#define PLATFORM_NOT_SUPPORTED "The only supported platforms are x86_64 and AArch64, PowerPC (work in progress)"
#if !defined(__x86_64__) && !defined(__aarch64__) && !defined(__PPC__)
// #error PLATFORM_NOT_SUPPORTED
#error PLATFORM_NOT_SUPPORTED
#endif
/// Check for presence of address sanitizer
@ -114,10 +114,12 @@
#if defined(__clang__)
#define NO_SANITIZE_UNDEFINED __attribute__((__no_sanitize__("undefined")))
#define NO_SANITIZE_ADDRESS __attribute__((__no_sanitize__("address")))
#define NO_SANITIZE_THREAD __attribute__((__no_sanitize__("thread")))
#else
/// It does not work in GCC. GCC 7 cannot recognize this attribute and GCC 8 simply ignores it.
#define NO_SANITIZE_UNDEFINED
#define NO_SANITIZE_ADDRESS
#define NO_SANITIZE_THREAD
#endif
#if defined __GNUC__ && !defined __clang__

View File

@ -1,8 +1,8 @@
#pragma once
#include <cstdint>
#include <string>
#include <vector>
#include <cstdint>
namespace DB

View File

@ -60,7 +60,7 @@ ConvertingBlockInputStream::ConvertingBlockInputStream(
if (input_header.has(res_elem.name))
conversion[result_col_num] = input_header.getPositionByName(res_elem.name);
else
throw Exception("Cannot find column " + backQuoteIfNeed(res_elem.name) + " in source stream",
throw Exception("Cannot find column " + backQuote(res_elem.name) + " in source stream",
ErrorCodes::THERE_IS_NO_COLUMN);
break;
}

View File

@ -12,7 +12,7 @@ namespace DB
class OneBlockInputStream : public IBlockInputStream
{
public:
OneBlockInputStream(const Block & block_) : block(block_) {}
explicit OneBlockInputStream(const Block & block_) : block(block_) {}
String getName() const override { return "One"; }

View File

@ -95,12 +95,11 @@ public:
{
active_threads = max_threads;
threads.reserve(max_threads);
auto thread_group = CurrentThread::getGroup();
try
{
for (size_t i = 0; i < max_threads; ++i)
threads.emplace_back([=] () { thread(thread_group, i); });
threads.emplace_back(&ParallelInputsProcessor::thread, this, CurrentThread::getGroup(), i);
}
catch (...)
{

View File

@ -63,6 +63,17 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
}
Block PushingToViewsBlockOutputStream::getHeader() const
{
/// If we don't write directly to the destination
/// then expect that we're inserting with precalculated virtual columns
if (output)
return storage->getSampleBlock();
else
return storage->getSampleBlockWithVirtuals();
}
void PushingToViewsBlockOutputStream::write(const Block & block)
{
/** Throw an exception if the sizes of arrays - elements of nested data structures doesn't match.
@ -73,6 +84,8 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
Nested::validateArraySizes(block);
if (output)
/// TODO: to support virtual and alias columns inside MVs, we should return here the inserted block extended
/// with additional columns directly from storage and pass it to MVs instead of raw block.
output->write(block);
/// Don't process materialized views if this block is duplicate

View File

@ -22,7 +22,7 @@ public:
const String & database, const String & table, const StoragePtr & storage_,
const Context & context_, const ASTPtr & query_ptr_, bool no_destination = false);
Block getHeader() const override { return storage->getSampleBlock(); }
Block getHeader() const override;
void write(const Block & block) override;
void flush() override;

View File

@ -26,6 +26,7 @@ TTLBlockInputStream::TTLBlockInputStream(
, date_lut(DateLUT::instance())
{
children.push_back(input_);
header = children.at(0)->getHeader();
const auto & column_defaults = storage.getColumns().getDefaults();
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
@ -58,11 +59,6 @@ TTLBlockInputStream::TTLBlockInputStream(
}
Block TTLBlockInputStream::getHeader() const
{
return children.at(0)->getHeader();
}
Block TTLBlockInputStream::readImpl()
{
Block block = children.at(0)->read();
@ -108,11 +104,13 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
const auto & current = block.getByName(storage.ttl_table_entry.result_column);
const IColumn * ttl_column = current.column.get();
const auto & column_names = header.getNames();
MutableColumns result_columns;
result_columns.reserve(getHeader().columns());
for (const auto & name : storage.getColumns().getNamesOfPhysical())
result_columns.reserve(column_names.size());
for (auto it = column_names.begin(); it != column_names.end(); ++it)
{
auto & column_with_type = block.getByName(name);
auto & column_with_type = block.getByName(*it);
const IColumn * values_column = column_with_type.column.get();
MutableColumnPtr result_column = values_column->cloneEmpty();
result_column->reserve(block.rows());
@ -125,13 +123,13 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
new_ttl_infos.table_ttl.update(cur_ttl);
result_column->insertFrom(*values_column, i);
}
else
else if (it == column_names.begin())
++rows_removed;
}
result_columns.emplace_back(std::move(result_column));
}
block = getHeader().cloneWithColumns(std::move(result_columns));
block = header.cloneWithColumns(std::move(result_columns));
}
void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)

View File

@ -21,7 +21,7 @@ public:
String getName() const override { return "TTLBlockInputStream"; }
Block getHeader() const override;
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
@ -47,6 +47,8 @@ private:
std::unordered_map<String, String> defaults_result_column;
ExpressionActionsPtr defaults_expression;
Block header;
private:
/// Removes values with expired ttl and computes new min_ttl and empty_columns for part
void removeValuesWithExpiredColumnTTL(Block & block);

View File

@ -30,6 +30,7 @@ namespace ErrorCodes
extern const int PARAMETERS_TO_AGGREGATE_FUNCTIONS_MUST_BE_LITERALS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
}
@ -216,6 +217,12 @@ void DataTypeAggregateFunction::deserializeTextQuoted(IColumn & column, ReadBuff
}
void DataTypeAggregateFunction::deserializeWholeText(IColumn &, ReadBuffer &, const FormatSettings &) const
{
throw Exception("AggregateFunction data type cannot be read from text", ErrorCodes::NOT_IMPLEMENTED);
}
void DataTypeAggregateFunction::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
writeJSONString(serializeToString(function, column, row_num), ostr, settings);

View File

@ -52,6 +52,8 @@ public:
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;

View File

@ -33,6 +33,10 @@ public:
*/
virtual void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const = 0;
/** Text deserialization without quoting or escaping.
*/
virtual void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const = 0;
/** Text serialization with escaping but without quoting.
*/
virtual void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const = 0;

View File

@ -32,6 +32,13 @@ DataTypeCustomSimpleTextSerialization::~DataTypeCustomSimpleTextSerialization()
{
}
void DataTypeCustomSimpleTextSerialization::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String str;
readString(str, istr);
deserializeFromString(*this, column, str, settings);
}
void DataTypeCustomSimpleTextSerialization::serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
writeEscapedString(serializeToString(*this, column, row_num, settings), ostr);

View File

@ -21,6 +21,10 @@ public:
virtual void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override = 0;
virtual void deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const = 0;
/** Text deserialization without quoting or escaping.
*/
void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
/** Text serialization with escaping but without quoting.
*/
void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;

View File

@ -16,6 +16,11 @@ void DataTypeDate::serializeText(const IColumn & column, size_t row_num, WriteBu
writeDateText(DayNum(static_cast<const ColumnUInt16 &>(column).getData()[row_num]), ostr);
}
void DataTypeDate::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
deserializeTextEscaped(column, istr, settings);
}
void DataTypeDate::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
DayNum x;

View File

@ -13,6 +13,7 @@ public:
const char * getFamilyName() const override { return "Date"; }
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;

View File

@ -62,6 +62,11 @@ static inline void readText(time_t & x, ReadBuffer & istr, const FormatSettings
}
void DataTypeDateTime::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
deserializeTextEscaped(column, istr, settings);
}
void DataTypeDateTime::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
time_t x;

View File

@ -38,6 +38,7 @@ public:
TypeIndex getTypeId() const override { return TypeIndex::DateTime; }
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;

View File

@ -166,6 +166,14 @@ void DataTypeEnum<Type>::deserializeTextQuoted(IColumn & column, ReadBuffer & is
static_cast<ColumnType &>(column).getData().push_back(getValue(StringRef(field_name)));
}
template <typename Type>
void DataTypeEnum<Type>::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
std::string field_name;
readString(field_name, istr);
static_cast<ColumnType &>(column).getData().push_back(getValue(StringRef(field_name)));
}
template <typename Type>
void DataTypeEnum<Type>::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{

View File

@ -96,6 +96,8 @@ public:
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;

View File

@ -176,6 +176,12 @@ void DataTypeFixedString::deserializeTextQuoted(IColumn & column, ReadBuffer & i
}
void DataTypeFixedString::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
read(*this, column, [&istr](ColumnFixedString::Chars & data) { readStringInto(data, istr); });
}
void DataTypeFixedString::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
const char * pos = reinterpret_cast<const char *>(&static_cast<const ColumnFixedString &>(column).getChars()[n * row_num]);

View File

@ -50,6 +50,8 @@ public:
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;

View File

@ -81,6 +81,11 @@ public:
deserializeImpl(column, &IDataType::deserializeAsTextQuoted, istr, settings);
}
void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override
{
deserializeImpl(column, &IDataType::deserializeAsTextEscaped, istr, settings);
}
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override
{
serializeImpl(column, row_num, &IDataType::serializeAsTextCSV, ostr, settings);

View File

@ -251,6 +251,15 @@ void DataTypeNullable::deserializeTextQuoted(IColumn & column, ReadBuffer & istr
[this, &istr, &settings] (IColumn & nested) { nested_data_type->deserializeAsTextQuoted(nested, istr, settings); });
}
void DataTypeNullable::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
safeDeserialize(column,
[&istr] { return checkStringByFirstCharacterAndAssertTheRestCaseInsensitive("NULL", istr); },
[this, &istr, &settings] (IColumn & nested) { nested_data_type->deserializeAsWholeText(nested, istr, settings); });
}
void DataTypeNullable::serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
const ColumnNullable & col = static_cast<const ColumnNullable &>(column);

View File

@ -53,6 +53,7 @@ public:
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;

View File

@ -244,6 +244,12 @@ static inline void read(IColumn & column, Reader && reader)
}
void DataTypeString::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
read(column, [&](ColumnString::Chars & data) { readStringInto(data, istr); });
}
void DataTypeString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
read(column, [&](ColumnString::Chars & data) { readEscapedStringInto(data, istr); });

View File

@ -30,6 +30,7 @@ public:
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override;
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;

View File

@ -32,6 +32,11 @@ protected:
serializeText(column, row_num, ostr, settings);
}
void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override
{
deserializeText(column, istr, settings);
}
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override
{
deserializeText(column, istr, settings);

View File

@ -142,122 +142,90 @@ void IDataType::insertDefaultInto(IColumn & column) const
void IDataType::serializeAsTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
if (custom_text_serialization)
{
custom_text_serialization->serializeTextEscaped(column, row_num, ostr, settings);
}
else
{
serializeTextEscaped(column, row_num, ostr, settings);
}
}
void IDataType::deserializeAsTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
if (custom_text_serialization)
{
custom_text_serialization->deserializeTextEscaped(column, istr, settings);
}
else
{
deserializeTextEscaped(column, istr, settings);
}
}
void IDataType::serializeAsTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
if (custom_text_serialization)
{
custom_text_serialization->serializeTextQuoted(column, row_num, ostr, settings);
}
else
{
serializeTextQuoted(column, row_num, ostr, settings);
}
}
void IDataType::deserializeAsTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
if (custom_text_serialization)
{
custom_text_serialization->deserializeTextQuoted(column, istr, settings);
}
else
{
deserializeTextQuoted(column, istr, settings);
}
}
void IDataType::serializeAsTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
if (custom_text_serialization)
{
custom_text_serialization->serializeTextCSV(column, row_num, ostr, settings);
}
else
{
serializeTextCSV(column, row_num, ostr, settings);
}
}
void IDataType::deserializeAsTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
if (custom_text_serialization)
{
custom_text_serialization->deserializeTextCSV(column, istr, settings);
}
else
{
deserializeTextCSV(column, istr, settings);
}
}
void IDataType::serializeAsText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
if (custom_text_serialization)
{
custom_text_serialization->serializeText(column, row_num, ostr, settings);
}
else
{
serializeText(column, row_num, ostr, settings);
}
void IDataType::deserializeAsWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
if (custom_text_serialization)
custom_text_serialization->deserializeWholeText(column, istr, settings);
else
deserializeWholeText(column, istr, settings);
}
void IDataType::serializeAsTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
if (custom_text_serialization)
{
custom_text_serialization->serializeTextJSON(column, row_num, ostr, settings);
}
else
{
serializeTextJSON(column, row_num, ostr, settings);
}
}
void IDataType::deserializeAsTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
if (custom_text_serialization)
{
custom_text_serialization->deserializeTextJSON(column, istr, settings);
}
else
{
deserializeTextJSON(column, istr, settings);
}
}
void IDataType::serializeAsTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
if (custom_text_serialization)
{
custom_text_serialization->serializeTextXML(column, row_num, ostr, settings);
}
else
{
serializeTextXML(column, row_num, ostr, settings);
}
}
void IDataType::setCustomization(DataTypeCustomDescPtr custom_desc_) const
{

View File

@ -222,76 +222,60 @@ public:
/// If method will throw an exception, then column will be in same state as before call to method.
virtual void deserializeBinary(IColumn & column, ReadBuffer & istr) const = 0;
/** Text serialization with escaping but without quoting.
*/
virtual void serializeAsTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const;
virtual void deserializeAsTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const;
/** Text serialization as a literal that may be inserted into a query.
*/
virtual void serializeAsTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const;
virtual void deserializeAsTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings &) const;
/** Text serialization for the CSV format.
*/
virtual void serializeAsTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const;
virtual void deserializeAsTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const;
/** Text serialization for displaying on a terminal or saving into a text file, and the like.
* Without escaping or quoting.
*/
virtual void serializeAsText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const;
/** Text serialization intended for using in JSON format.
*/
virtual void serializeAsTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const;
virtual void deserializeAsTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const;
/** Text serialization for putting into the XML format.
*/
virtual void serializeAsTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const;
/** Serialize to a protobuf. */
virtual void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const = 0;
virtual void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const = 0;
protected:
virtual String doGetName() const;
/** Text serialization with escaping but without quoting.
*/
public: // used somewhere in arcadia
virtual void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const = 0;
void serializeAsTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const;
protected:
virtual void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const = 0;
void deserializeAsTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const;
/** Text serialization as a literal that may be inserted into a query.
*/
virtual void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const = 0;
void serializeAsTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const;
virtual void deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings &) const = 0;
void deserializeAsTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings &) const;
/** Text serialization for the CSV format.
*/
virtual void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const = 0;
virtual void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const = 0;
void serializeAsTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const;
void deserializeAsTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const;
/** Text serialization for displaying on a terminal or saving into a text file, and the like.
* Without escaping or quoting.
*/
virtual void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const = 0;
void serializeAsText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const;
/** Text deserialization in case when buffer contains only one value, without any escaping and delimiters.
*/
void deserializeAsWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const;
/** Text serialization intended for using in JSON format.
* force_quoting_64bit_integers parameter forces to brace UInt64 and Int64 types into quotes.
*/
virtual void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const = 0;
virtual void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const = 0;
void serializeAsTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const;
void deserializeAsTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const;
/** Text serialization for putting into the XML format.
*/
void serializeAsTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const;
protected:
virtual String doGetName() const;
/// Default implementations of text serialization in case of 'custom_text_serialization' is not set.
virtual void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const = 0;
virtual void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const = 0;
virtual void serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const = 0;
virtual void deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings &) const = 0;
virtual void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const = 0;
virtual void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const = 0;
virtual void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const = 0;
virtual void deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings &) const = 0;
virtual void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const = 0;
virtual void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const = 0;
virtual void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
serializeText(column, row_num, ostr, settings);
@ -471,7 +455,6 @@ private:
public:
const IDataTypeCustomName * getCustomName() const { return custom_name.get(); }
const IDataTypeCustomTextSerialization * getCustomTextSerialization() const { return custom_text_serialization.get(); }
};

View File

@ -86,7 +86,7 @@ DatabaseIteratorPtr DatabaseDictionary::getIterator(const Context & context, con
bool DatabaseDictionary::empty(const Context & context) const
{
return context.getExternalDictionaries().getNumberOfNames() == 0;
return !context.getExternalDictionaries().hasCurrentlyLoadedObjects();
}
StoragePtr DatabaseDictionary::detachTable(const String & /*table_name*/)

View File

@ -65,11 +65,12 @@ void registerInputFormatRowBinary(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<BinaryRowInputStream>(buf, sample, false, false),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
factory.registerInputFormat("RowBinaryWithNamesAndTypes", [](
@ -78,11 +79,12 @@ void registerInputFormatRowBinary(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<BinaryRowInputStream>(buf, sample, true, true),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}

View File

@ -28,9 +28,15 @@ BlockInputStreamFromRowInputStream::BlockInputStreamFromRowInputStream(
const Block & sample_,
UInt64 max_block_size_,
UInt64 rows_portion_size_,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
: row_input(row_input_), sample(sample_), max_block_size(max_block_size_), rows_portion_size(rows_portion_size_),
allow_errors_num(settings.input_allow_errors_num), allow_errors_ratio(settings.input_allow_errors_ratio)
: row_input(row_input_)
, sample(sample_)
, max_block_size(max_block_size_)
, rows_portion_size(rows_portion_size_)
, read_virtual_columns_callback(callback)
, allow_errors_num(settings.input_allow_errors_num)
, allow_errors_ratio(settings.input_allow_errors_ratio)
{
}
@ -73,6 +79,8 @@ Block BlockInputStreamFromRowInputStream::readImpl()
RowReadExtension info;
if (!row_input->read(columns, info))
break;
if (read_virtual_columns_callback)
read_virtual_columns_callback();
for (size_t column_idx = 0; column_idx < info.read_columns.size(); ++column_idx)
{

View File

@ -2,6 +2,7 @@
#include <Core/Defines.h>
#include <DataStreams/IBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <Formats/IRowInputStream.h>
@ -24,6 +25,7 @@ public:
const Block & sample_,
UInt64 max_block_size_,
UInt64 rows_portion_size_,
FormatFactory::ReadCallback callback,
const FormatSettings & settings);
void readPrefix() override { row_input->readPrefix(); }
@ -45,6 +47,10 @@ private:
Block sample;
UInt64 max_block_size;
UInt64 rows_portion_size;
/// Callback used to setup virtual columns after reading each row.
FormatFactory::ReadCallback read_virtual_columns_callback;
BlockMissingValues block_missing_values;
UInt64 allow_errors_num;

View File

@ -531,11 +531,12 @@ void registerInputFormatCSV(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<CSVRowInputStream>(buf, sample, with_names, settings),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}
}

View File

@ -308,6 +308,7 @@ void registerInputFormatCapnProto(FormatFactory & factory)
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
@ -315,6 +316,7 @@ void registerInputFormatCapnProto(FormatFactory & factory)
sample,
max_block_size,
rows_portion_size,
callback,
settings);
});
}

View File

@ -27,7 +27,14 @@ const FormatFactory::Creators & FormatFactory::getCreators(const String & name)
}
BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & buf, const Block & sample, const Context & context, UInt64 max_block_size, UInt64 rows_portion_size) const
BlockInputStreamPtr FormatFactory::getInput(
const String & name,
ReadBuffer & buf,
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
ReadCallback callback) const
{
const auto & input_getter = getCreators(name).first;
if (!input_getter)
@ -48,7 +55,8 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
return input_getter(buf, sample, context, max_block_size, rows_portion_size, format_settings);
return input_getter(
buf, sample, context, max_block_size, rows_portion_size, callback ? callback : ReadCallback(), format_settings);
}

View File

@ -24,6 +24,11 @@ class WriteBuffer;
*/
class FormatFactory final : public ext::singleton<FormatFactory>
{
public:
/// This callback allows to perform some additional actions after reading a single row.
/// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer.
using ReadCallback = std::function<void()>;
private:
using InputCreator = std::function<BlockInputStreamPtr(
ReadBuffer & buf,
@ -31,6 +36,7 @@ private:
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
ReadCallback callback,
const FormatSettings & settings)>;
using OutputCreator = std::function<BlockOutputStreamPtr(
@ -44,8 +50,14 @@ private:
using FormatsDictionary = std::unordered_map<String, Creators>;
public:
BlockInputStreamPtr getInput(const String & name, ReadBuffer & buf,
const Block & sample, const Context & context, UInt64 max_block_size, UInt64 rows_portion_size = 0) const;
BlockInputStreamPtr getInput(
const String & name,
ReadBuffer & buf,
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size = 0,
ReadCallback callback = {}) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context) const;

View File

@ -260,11 +260,12 @@ void registerInputFormatJSONEachRow(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<JSONEachRowRowInputStream>(buf, sample, settings),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}

View File

@ -14,6 +14,7 @@ void registerInputFormatNative(FormatFactory & factory)
const Context &,
UInt64 /* max_block_size */,
UInt64 /* min_read_rows */,
FormatFactory::ReadCallback /* callback */,
const FormatSettings &)
{
return std::make_shared<NativeBlockInputStream>(buf, sample, 0);

View File

@ -476,6 +476,7 @@ void registerInputFormatParquet(FormatFactory & factory)
const Context & context,
UInt64 /* max_block_size */,
UInt64 /* rows_portion_size */,
FormatFactory::ReadCallback /* callback */,
const FormatSettings & /* settings */) { return std::make_shared<ParquetBlockInputStream>(buf, sample, context); });
}

View File

@ -74,11 +74,12 @@ void registerInputFormatProtobuf(FormatFactory & factory)
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<ProtobufRowInputStream>(buf, sample, FormatSchemaInfo(context, "Protobuf")),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}

View File

@ -199,11 +199,12 @@ void registerInputFormatTSKV(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TSKVRowInputStream>(buf, sample, settings),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}

View File

@ -457,11 +457,12 @@ void registerInputFormatTabSeparated(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TabSeparatedRowInputStream>(buf, sample, false, false, settings),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}
@ -473,11 +474,12 @@ void registerInputFormatTabSeparated(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TabSeparatedRowInputStream>(buf, sample, true, false, settings),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}
@ -489,11 +491,12 @@ void registerInputFormatTabSeparated(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TabSeparatedRowInputStream>(buf, sample, true, true, settings),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}
}

View File

@ -156,11 +156,12 @@ void registerInputFormatValues(FormatFactory & factory)
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<ValuesRowInputStream>(buf, sample, context, settings),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}

View File

@ -45,7 +45,7 @@ try
FormatSettings format_settings;
RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, []{}, format_settings);
RowOutputStreamPtr row_output = std::make_shared<TabSeparatedRowOutputStream>(out_buf, sample, false, false, format_settings);
BlockOutputStreamFromRowOutputStream block_output(row_output, sample);

View File

@ -42,7 +42,7 @@ try
RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings);
RowOutputStreamPtr row_output = std::make_shared<TabSeparatedRowOutputStream>(out_buf, sample, false, false, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, []{}, format_settings);
BlockOutputStreamFromRowOutputStream block_output(row_output, sample);
copyData(block_input, block_output);

View File

@ -65,6 +65,11 @@ if(USE_XXHASH)
target_include_directories(clickhouse_functions SYSTEM PRIVATE ${XXHASH_INCLUDE_DIR})
endif()
if (USE_H3)
target_link_libraries(clickhouse_functions PRIVATE ${H3_LIBRARY})
target_include_directories(clickhouse_functions SYSTEM PRIVATE ${H3_INCLUDE_DIR})
endif()
if(USE_HYPERSCAN)
target_link_libraries(clickhouse_functions PRIVATE ${HYPERSCAN_LIBRARY})
target_include_directories(clickhouse_functions SYSTEM PRIVATE ${HYPERSCAN_INCLUDE_DIR})

View File

@ -57,10 +57,10 @@ void RandImpl::execute(char * output, size_t size)
for (const char * end = output + size; output < end; output += 16)
{
unalignedStore(output, generator0.next());
unalignedStore(output + 4, generator1.next());
unalignedStore(output + 8, generator2.next());
unalignedStore(output + 12, generator3.next());
unalignedStore<UInt32>(output, generator0.next());
unalignedStore<UInt32>(output + 4, generator1.next());
unalignedStore<UInt32>(output + 8, generator2.next());
unalignedStore<UInt32>(output + 12, generator3.next());
}
/// It is guaranteed (by PaddedPODArray) that we can overwrite up to 15 bytes after end.

View File

@ -271,11 +271,17 @@ struct NgramDistanceImpl
{
size_t first_size = dispatchSearcher(calculateHaystackStatsAndMetric<false>, data.data(), data_size, common_stats, distance, nullptr);
/// For !Symmetric version we should not use first_size.
res = distance * 1.f / std::max(Symmetric * first_size + second_size, size_t(1));
if constexpr (Symmetric)
res = distance * 1.f / std::max(first_size + second_size, size_t(1));
else
res = 1.f - distance * 1.f / std::max(second_size, size_t(1));
}
else
{
if constexpr (Symmetric)
res = 1.f;
else
res = 0.f;
}
}
@ -333,13 +339,19 @@ struct NgramDistanceImpl
/// For !Symmetric version we should not use haystack_stats_size.
res[i] = distance * 1.f / std::max(Symmetric * haystack_stats_size + needle_stats_size, size_t(1));
if constexpr (Symmetric)
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, size_t(1));
else
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, size_t(1));
}
else
{
/// Strings are too big, we are assuming they are not the same. This is done because of limiting number
/// of bigrams added and not allocating too much memory.
if constexpr (Symmetric)
res[i] = 1.f;
else
res[i] = 0.f;
}
prev_needle_offset = needle_offsets[i];
@ -399,11 +411,11 @@ struct NgramDistanceImpl
for (size_t j = 0; j < needle_stats_size; ++j)
--common_stats[needle_ngram_storage[j]];
res[i] = distance * 1.f / std::max(needle_stats_size, size_t(1));
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, size_t(1));
}
else
{
res[i] = 1.f;
res[i] = 0.f;
}
prev_offset = needle_offsets[i];
@ -446,12 +458,18 @@ struct NgramDistanceImpl
distance,
ngram_storage.get());
/// For !Symmetric version we should not use haystack_stats_size.
res[i] = distance * 1.f / std::max(Symmetric * haystack_stats_size + needle_stats_size, size_t(1));
if constexpr (Symmetric)
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, size_t(1));
else
res[i] = 1.f - distance * 1.f / std::max(needle_stats_size, size_t(1));
}
else
{
/// if the strings are too big, we say they are completely not the same
if constexpr (Symmetric)
res[i] = 1.f;
else
res[i] = 0.f;
}
distance = needle_stats_size;
prev_offset = offsets[i];

View File

@ -91,8 +91,7 @@ struct ExtractBool
struct ExtractRaw
{
static constexpr size_t bytes_on_stack = 64;
using ExpectChars = PODArray<char, bytes_on_stack, AllocatorWithStackMemory<Allocator<false>, bytes_on_stack>>;
using ExpectChars = PODArrayWithStackMemory<char, 64>;
static void extract(const UInt8 * pos, const UInt8 * end, ColumnString::Chars & res_data)
{

View File

@ -3,44 +3,117 @@
#include "protocol.h"
#include <common/find_symbols.h>
#include <cstring>
#include <Common/StringUtils/StringUtils.h>
namespace DB
{
namespace
{
inline StringRef checkAndReturnHost(const Pos & pos, const Pos & dot_pos, const Pos & start_of_host)
{
if (!dot_pos || start_of_host >= pos || pos - dot_pos == 1)
return StringRef{};
auto after_dot = *(dot_pos + 1);
if (after_dot == ':' || after_dot == '/' || after_dot == '?' || after_dot == '#')
return StringRef{};
return StringRef(start_of_host, pos - start_of_host);
}
}
/// Extracts host from given url.
inline StringRef getURLHost(const char * data, size_t size)
{
Pos pos = data;
Pos end = data + size;
if (end == (pos = find_first_symbols<'/'>(pos, end)))
return {};
if (pos != data)
if (*pos == '/' && *(pos + 1) == '/')
{
StringRef scheme = getURLScheme(data, size);
Pos scheme_end = data + scheme.size;
// Colon must follows after scheme.
if (pos - scheme_end != 1 || *scheme_end != ':')
return {};
pos += 2;
}
else
{
Pos scheme_end = data + std::min(size, 16UL);
for (++pos; pos < scheme_end; ++pos)
{
if (!isAlphaNumericASCII(*pos))
{
switch (*pos)
{
case '.':
case '-':
case '+':
break;
case ' ': /// restricted symbols
case '\t':
case '<':
case '>':
case '%':
case '{':
case '}':
case '|':
case '\\':
case '^':
case '~':
case '[':
case ']':
case ';':
case '=':
case '&':
return StringRef{};
default:
goto exloop;
}
}
}
exloop: if ((scheme_end - pos) > 2 && *pos == ':' && *(pos + 1) == '/' && *(pos + 2) == '/')
pos += 3;
else
pos = data;
}
if (end - pos < 2 || *(pos) != '/' || *(pos + 1) != '/')
return {};
pos += 2;
const char * start_of_host = pos;
Pos dot_pos = nullptr;
auto start_of_host = pos;
for (; pos < end; ++pos)
{
if (*pos == '@')
start_of_host = pos + 1;
else if (*pos == ':' || *pos == '/' || *pos == '?' || *pos == '#')
switch (*pos)
{
case '.':
dot_pos = pos;
break;
case ':': /// end symbols
case '/':
case '?':
case '#':
return checkAndReturnHost(pos, dot_pos, start_of_host);
case '@': /// myemail@gmail.com
start_of_host = pos + 1;
break;
case ' ': /// restricted symbols in whole URL
case '\t':
case '<':
case '>':
case '%':
case '{':
case '}':
case '|':
case '\\':
case '^':
case '~':
case '[':
case ']':
case ';':
case '=':
case '&':
return StringRef{};
}
}
return (pos == start_of_host) ? StringRef{} : StringRef(start_of_host, pos - start_of_host);
return checkAndReturnHost(pos, dot_pos, start_of_host);
}
template <bool without_www>

View File

@ -8,3 +8,4 @@
#cmakedefine01 USE_HYPERSCAN
#cmakedefine01 USE_SIMDJSON
#cmakedefine01 USE_RAPIDJSON
#cmakedefine01 USE_H3

View File

@ -0,0 +1,108 @@
#include "config_functions.h"
#if USE_H3
#include <array>
#include <math.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Common/typeid_cast.h>
#include <ext/range.h>
extern "C"
{
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdocumentation"
#endif
#include <h3api.h>
#ifdef __clang__
#pragma clang diagnostic pop
#endif
}
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
}
/// Implements the function geoToH3 which takes 3 arguments (latitude, longitude and h3 resolution)
/// and returns h3 index of this point
class FunctionGeoToH3 : public IFunction
{
public:
static constexpr auto name = "geoToH3";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionGeoToH3>(); }
std::string getName() const override { return name; }
size_t getNumberOfArguments() const override { return 3; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
auto arg = arguments[0].get();
if (!WhichDataType(arg).isFloat64())
throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(1) + " of function " + getName() + ". Must be Float64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
arg = arguments[1].get();
if (!WhichDataType(arg).isFloat64())
throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(2) + " of function " + getName() + ". Must be Float64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
arg = arguments[2].get();
if (!WhichDataType(arg).isUInt8())
throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(3) + " of function " + getName() + ". Must be UInt8",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeUInt64>();
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{
const auto col_lon = block.getByPosition(arguments[0]).column.get();
const auto col_lat = block.getByPosition(arguments[1]).column.get();
const auto col_res = block.getByPosition(arguments[2]).column.get();
auto dst = ColumnVector<UInt64>::create();
auto & dst_data = dst->getData();
dst_data.resize(input_rows_count);
for (const auto row : ext::range(0, input_rows_count))
{
const double lon = col_lon->getFloat64(row);
const double lat = col_lat->getFloat64(row);
const UInt8 res = col_res->getUInt(row);
GeoCoord coord;
coord.lon = H3_EXPORT(degsToRads)(lon);
coord.lat = H3_EXPORT(degsToRads)(lat);
H3Index hindex = H3_EXPORT(geoToH3)(&coord, res);
dst_data[row] = hindex;
}
block.getByPosition(result).column = std::move(dst);
}
};
void registerFunctionGeoToH3(FunctionFactory & factory)
{
factory.registerFunction<FunctionGeoToH3>();
}
}
#endif

View File

@ -0,0 +1,99 @@
#include <Functions/FunctionFactory.h>
#include <Functions/GeoUtils.h>
#include <Functions/FunctionHelpers.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <string>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
}
// geohashDecode(string) => (lon float64, lat float64)
class FunctionGeohashDecode : public IFunction
{
public:
static constexpr auto name = "geohashDecode";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionGeohashDecode>(); }
String getName() const override
{
return name;
}
size_t getNumberOfArguments() const override { return 1; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
validateArgumentType(*this, arguments, 0, isStringOrFixedString, "string or fixed string");
return std::make_shared<DataTypeTuple>(
DataTypes{std::make_shared<DataTypeFloat64>(), std::make_shared<DataTypeFloat64>()},
Strings{"longitude", "latitude"});
}
template <typename ColumnTypeEncoded>
bool tryExecute(const IColumn * encoded_column, ColumnPtr & result_column)
{
const auto * encoded = checkAndGetColumn<ColumnTypeEncoded>(encoded_column);
if (!encoded)
return false;
const size_t count = encoded->size();
auto latitude = ColumnFloat64::create(count);
auto longitude = ColumnFloat64::create(count);
ColumnFloat64::Container & lon_data = longitude->getData();
ColumnFloat64::Container & lat_data = latitude->getData();
for (size_t i = 0; i < count; ++i)
{
StringRef encoded_string = encoded->getDataAt(i);
GeoUtils::geohashDecode(encoded_string.data, encoded_string.size, &lon_data[i], &lat_data[i]);
}
MutableColumns result;
result.emplace_back(std::move(longitude));
result.emplace_back(std::move(latitude));
result_column = ColumnTuple::create(std::move(result));
return true;
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override
{
const IColumn * encoded = block.getByPosition(arguments[0]).column.get();
ColumnPtr & res_column = block.getByPosition(result).column;
if (tryExecute<ColumnString>(encoded, res_column) ||
tryExecute<ColumnFixedString>(encoded, res_column))
return;
throw Exception("Unsupported argument type:" + block.getByPosition(arguments[0]).column->getName()
+ " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
};
void registerFunctionGeohashDecode(FunctionFactory & factory)
{
factory.registerFunction<FunctionGeohashDecode>();
}
}

View File

@ -0,0 +1,136 @@
#include <Functions/FunctionFactory.h>
#include <Functions/GeoUtils.h>
#include <Functions/FunctionHelpers.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <string>
#define GEOHASH_MAX_TEXT_LENGTH 16
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_COLUMN;
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
}
// geohashEncode(lon float32/64, lat float32/64, length UInt8) => string
class FunctionGeohashEncode : public IFunction
{
public:
static constexpr auto name = "geohashEncode";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionGeohashEncode>(); }
String getName() const override
{
return name;
}
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {2}; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
validateArgumentType(*this, arguments, 0, isFloat, "float");
validateArgumentType(*this, arguments, 1, isFloat, "float");
if (arguments.size() == 3)
{
validateArgumentType(*this, arguments, 2, isInteger, "integer");
}
if (arguments.size() > 3)
{
throw Exception("Too many arguments for function " + getName() +
" expected at most 3",
ErrorCodes::TOO_MANY_ARGUMENTS_FOR_FUNCTION);
}
return std::make_shared<DataTypeString>();
}
template <typename LonType, typename LatType>
bool tryExecute(const IColumn * lon_column, const IColumn * lat_column, UInt64 precision_value, ColumnPtr & result)
{
const ColumnVector<LonType> * longitude = checkAndGetColumn<ColumnVector<LonType>>(lon_column);
const ColumnVector<LatType> * latitude = checkAndGetColumn<ColumnVector<LatType>>(lat_column);
if (!latitude || !longitude)
return false;
auto col_str = ColumnString::create();
ColumnString::Chars & out_vec = col_str->getChars();
ColumnString::Offsets & out_offsets = col_str->getOffsets();
const size_t size = lat_column->size();
out_offsets.resize(size);
out_vec.resize(size * (GEOHASH_MAX_TEXT_LENGTH + 1));
char * begin = reinterpret_cast<char *>(out_vec.data());
char * pos = begin;
for (size_t i = 0; i < size; ++i)
{
const Float64 longitude_value = longitude->getElement(i);
const Float64 latitude_value = latitude->getElement(i);
const size_t encoded_size = GeoUtils::geohashEncode(longitude_value, latitude_value, precision_value, pos);
pos += encoded_size;
*pos = '\0';
out_offsets[i] = ++pos - begin;
}
out_vec.resize(pos - begin);
if (!out_offsets.empty() && out_offsets.back() != out_vec.size())
throw Exception("Column size mismatch (internal logical error)", ErrorCodes::LOGICAL_ERROR);
result = std::move(col_str);
return true;
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override
{
const IColumn * longitude = block.getByPosition(arguments[0]).column.get();
const IColumn * latitude = block.getByPosition(arguments[1]).column.get();
const UInt64 precision_value = std::min<UInt64>(GEOHASH_MAX_TEXT_LENGTH,
arguments.size() == 3 ? block.getByPosition(arguments[2]).column->get64(0) : GEOHASH_MAX_TEXT_LENGTH);
ColumnPtr & res_column = block.getByPosition(result).column;
if (tryExecute<Float32, Float32>(longitude, latitude, precision_value, res_column) ||
tryExecute<Float64, Float32>(longitude, latitude, precision_value, res_column) ||
tryExecute<Float32, Float64>(longitude, latitude, precision_value, res_column) ||
tryExecute<Float64, Float64>(longitude, latitude, precision_value, res_column))
return;
std::string arguments_description;
for (size_t i = 0; i < arguments.size(); ++i)
{
if (i != 0)
arguments_description += ", ";
arguments_description += block.getByPosition(arguments[i]).column->getName();
}
throw Exception("Unsupported argument types: " + arguments_description +
+ " for function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
};
void registerFunctionGeohashEncode(FunctionFactory & factory)
{
factory.registerFunction<FunctionGeohashEncode>();
}
}

View File

@ -0,0 +1,166 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnConst.h>
#include <Common/typeid_cast.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/FunctionFactory.h>
#include <ext/range.h>
#include <math.h>
#include <array>
#define DEGREES_IN_RADIANS (M_PI / 180.0)
#define EARTH_RADIUS_IN_METERS 6372797.560856
namespace DB
{
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int ILLEGAL_COLUMN;
extern const int LOGICAL_ERROR;
}
static inline Float64 degToRad(Float64 angle) { return angle * DEGREES_IN_RADIANS; }
/**
* The function calculates distance in meters between two points on Earth specified by longitude and latitude in degrees.
* The function uses great circle distance formula https://en.wikipedia.org/wiki/Great-circle_distance.
* Throws exception when one or several input values are not within reasonable bounds.
* Latitude must be in [-90, 90], longitude must be [-180, 180]
*
*/
class FunctionGreatCircleDistance : public IFunction
{
public:
static constexpr auto name = "greatCircleDistance";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionGreatCircleDistance>(); }
private:
enum class instr_type : uint8_t
{
get_float_64,
get_const_float_64
};
using instr_t = std::pair<instr_type, const IColumn *>;
using instrs_t = std::array<instr_t, 4>;
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 4; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
for (const auto arg_idx : ext::range(0, arguments.size()))
{
const auto arg = arguments[arg_idx].get();
if (!WhichDataType(arg).isFloat64())
throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(arg_idx + 1) + " of function " + getName() + ". Must be Float64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
return std::make_shared<DataTypeFloat64>();
}
instrs_t getInstructions(const Block & block, const ColumnNumbers & arguments, bool & out_const)
{
instrs_t result;
out_const = true;
for (const auto arg_idx : ext::range(0, arguments.size()))
{
const auto column = block.getByPosition(arguments[arg_idx]).column.get();
if (const auto col = checkAndGetColumn<ColumnVector<Float64>>(column))
{
out_const = false;
result[arg_idx] = instr_t{instr_type::get_float_64, col};
}
else if (const auto col_const = checkAndGetColumnConst<ColumnVector<Float64>>(column))
{
result[arg_idx] = instr_t{instr_type::get_const_float_64, col_const};
}
else
throw Exception("Illegal column " + column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
return result;
}
/// https://en.wikipedia.org/wiki/Great-circle_distance
Float64 greatCircleDistance(Float64 lon1Deg, Float64 lat1Deg, Float64 lon2Deg, Float64 lat2Deg)
{
if (lon1Deg < -180 || lon1Deg > 180 ||
lon2Deg < -180 || lon2Deg > 180 ||
lat1Deg < -90 || lat1Deg > 90 ||
lat2Deg < -90 || lat2Deg > 90)
{
throw Exception("Arguments values out of bounds for function " + getName(), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
Float64 lon1Rad = degToRad(lon1Deg);
Float64 lat1Rad = degToRad(lat1Deg);
Float64 lon2Rad = degToRad(lon2Deg);
Float64 lat2Rad = degToRad(lat2Deg);
Float64 u = sin((lat2Rad - lat1Rad) / 2);
Float64 v = sin((lon2Rad - lon1Rad) / 2);
return 2.0 * EARTH_RADIUS_IN_METERS * asin(sqrt(u * u + cos(lat1Rad) * cos(lat2Rad) * v * v));
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{
const auto size = input_rows_count;
bool result_is_const{};
auto instrs = getInstructions(block, arguments, result_is_const);
if (result_is_const)
{
const auto & colLon1 = static_cast<const ColumnConst *>(block.getByPosition(arguments[0]).column.get())->getValue<Float64>();
const auto & colLat1 = static_cast<const ColumnConst *>(block.getByPosition(arguments[1]).column.get())->getValue<Float64>();
const auto & colLon2 = static_cast<const ColumnConst *>(block.getByPosition(arguments[2]).column.get())->getValue<Float64>();
const auto & colLat2 = static_cast<const ColumnConst *>(block.getByPosition(arguments[3]).column.get())->getValue<Float64>();
Float64 res = greatCircleDistance(colLon1, colLat1, colLon2, colLat2);
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(size, res);
}
else
{
auto dst = ColumnVector<Float64>::create();
auto & dst_data = dst->getData();
dst_data.resize(size);
Float64 vals[instrs.size()];
for (const auto row : ext::range(0, size))
{
for (const auto idx : ext::range(0, instrs.size()))
{
if (instr_type::get_float_64 == instrs[idx].first)
vals[idx] = static_cast<const ColumnVector<Float64> *>(instrs[idx].second)->getData()[row];
else if (instr_type::get_const_float_64 == instrs[idx].first)
vals[idx] = static_cast<const ColumnConst *>(instrs[idx].second)->getValue<Float64>();
else
throw Exception{"Unknown instruction type in implementation of greatCircleDistance function", ErrorCodes::LOGICAL_ERROR};
}
dst_data[row] = greatCircleDistance(vals[0], vals[1], vals[2], vals[3]);
}
block.getByPosition(result).column = std::move(dst);
}
}
};
void registerFunctionGreatCircleDistance(FunctionFactory & factory)
{
factory.registerFunction<FunctionGreatCircleDistance>();
}
}

View File

@ -1,17 +1,11 @@
#pragma once
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnConst.h>
#include <Common/typeid_cast.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/FunctionFactory.h>
#include <ext/range.h>
#include <math.h>
#include <array>
#define DEGREES_IN_RADIANS (M_PI / 180.0)
#define EARTH_RADIUS_IN_METERS 6372797.560856
namespace DB
@ -19,148 +13,11 @@ namespace DB
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_COLUMN;
extern const int LOGICAL_ERROR;
}
static inline Float64 degToRad(Float64 angle) { return angle * DEGREES_IN_RADIANS; }
static inline Float64 radToDeg(Float64 angle) { return angle / DEGREES_IN_RADIANS; }
/**
* The function calculates distance in meters between two points on Earth specified by longitude and latitude in degrees.
* The function uses great circle distance formula https://en.wikipedia.org/wiki/Great-circle_distance.
* Throws exception when one or several input values are not within reasonable bounds.
* Latitude must be in [-90, 90], longitude must be [-180, 180]
*
*/
class FunctionGreatCircleDistance : public IFunction
{
public:
static constexpr auto name = "greatCircleDistance";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionGreatCircleDistance>(); }
private:
enum class instr_type : uint8_t
{
get_float_64,
get_const_float_64
};
using instr_t = std::pair<instr_type, const IColumn *>;
using instrs_t = std::array<instr_t, 4>;
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 4; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
for (const auto arg_idx : ext::range(0, arguments.size()))
{
const auto arg = arguments[arg_idx].get();
if (!WhichDataType(arg).isFloat64())
throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(arg_idx + 1) + " of function " + getName() + ". Must be Float64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
return std::make_shared<DataTypeFloat64>();
}
instrs_t getInstructions(const Block & block, const ColumnNumbers & arguments, bool & out_const)
{
instrs_t result;
out_const = true;
for (const auto arg_idx : ext::range(0, arguments.size()))
{
const auto column = block.getByPosition(arguments[arg_idx]).column.get();
if (const auto col = checkAndGetColumn<ColumnVector<Float64>>(column))
{
out_const = false;
result[arg_idx] = instr_t{instr_type::get_float_64, col};
}
else if (const auto col_const = checkAndGetColumnConst<ColumnVector<Float64>>(column))
{
result[arg_idx] = instr_t{instr_type::get_const_float_64, col_const};
}
else
throw Exception("Illegal column " + column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
return result;
}
/// https://en.wikipedia.org/wiki/Great-circle_distance
Float64 greatCircleDistance(Float64 lon1Deg, Float64 lat1Deg, Float64 lon2Deg, Float64 lat2Deg)
{
if (lon1Deg < -180 || lon1Deg > 180 ||
lon2Deg < -180 || lon2Deg > 180 ||
lat1Deg < -90 || lat1Deg > 90 ||
lat2Deg < -90 || lat2Deg > 90)
{
throw Exception("Arguments values out of bounds for function " + getName(), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
Float64 lon1Rad = degToRad(lon1Deg);
Float64 lat1Rad = degToRad(lat1Deg);
Float64 lon2Rad = degToRad(lon2Deg);
Float64 lat2Rad = degToRad(lat2Deg);
Float64 u = sin((lat2Rad - lat1Rad) / 2);
Float64 v = sin((lon2Rad - lon1Rad) / 2);
return 2.0 * EARTH_RADIUS_IN_METERS * asin(sqrt(u * u + cos(lat1Rad) * cos(lat2Rad) * v * v));
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{
const auto size = input_rows_count;
bool result_is_const{};
auto instrs = getInstructions(block, arguments, result_is_const);
if (result_is_const)
{
const auto & colLon1 = static_cast<const ColumnConst *>(block.getByPosition(arguments[0]).column.get())->getValue<Float64>();
const auto & colLat1 = static_cast<const ColumnConst *>(block.getByPosition(arguments[1]).column.get())->getValue<Float64>();
const auto & colLon2 = static_cast<const ColumnConst *>(block.getByPosition(arguments[2]).column.get())->getValue<Float64>();
const auto & colLat2 = static_cast<const ColumnConst *>(block.getByPosition(arguments[3]).column.get())->getValue<Float64>();
Float64 res = greatCircleDistance(colLon1, colLat1, colLon2, colLat2);
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(size, res);
}
else
{
auto dst = ColumnVector<Float64>::create();
auto & dst_data = dst->getData();
dst_data.resize(size);
Float64 vals[instrs.size()];
for (const auto row : ext::range(0, size))
{
for (const auto idx : ext::range(0, instrs.size()))
{
if (instr_type::get_float_64 == instrs[idx].first)
vals[idx] = static_cast<const ColumnVector<Float64> *>(instrs[idx].second)->getData()[row];
else if (instr_type::get_const_float_64 == instrs[idx].first)
vals[idx] = static_cast<const ColumnConst *>(instrs[idx].second)->getValue<Float64>();
else
throw Exception{"Unknown instruction type in implementation of greatCircleDistance function", ErrorCodes::LOGICAL_ERROR};
}
dst_data[row] = greatCircleDistance(vals[0], vals[1], vals[2], vals[3]);
}
block.getByPosition(result).column = std::move(dst);
}
}
};
/**
* The function checks if a point is in one of ellipses in set.
* The number of arguments must be 2 + 4*N where N is the number of ellipses.
@ -177,7 +34,6 @@ private:
class FunctionPointInEllipses : public IFunction
{
public:
static constexpr auto name = "pointInEllipses";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionPointInEllipses>(); }
@ -330,6 +186,10 @@ private:
}
};
void registerFunctionPointInEllipses(FunctionFactory & factory)
{
factory.registerFunction<FunctionPointInEllipses>();
}
#undef DEGREES_IN_RADIANS
}

Some files were not shown because too many files have changed in this diff Show More