Merge branch 'master' into addressToLineWithInlines

This commit is contained in:
mergify[bot] 2022-01-28 08:20:48 +00:00 committed by GitHub
commit 81c841a89c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
64 changed files with 1979 additions and 412 deletions

6
.gitmodules vendored
View File

@ -217,6 +217,9 @@
[submodule "contrib/yaml-cpp"]
path = contrib/yaml-cpp
url = https://github.com/ClickHouse-Extras/yaml-cpp.git
[submodule "contrib/cld2"]
path = contrib/cld2
url = https://github.com/ClickHouse-Extras/cld2.git
[submodule "contrib/libstemmer_c"]
path = contrib/libstemmer_c
url = https://github.com/ClickHouse-Extras/libstemmer_c.git
@ -247,6 +250,9 @@
[submodule "contrib/sysroot"]
path = contrib/sysroot
url = https://github.com/ClickHouse-Extras/sysroot.git
[submodule "contrib/nlp-data"]
path = contrib/nlp-data
url = https://github.com/ClickHouse-Extras/nlp-data.git
[submodule "contrib/hive-metastore"]
path = contrib/hive-metastore
url = https://github.com/ClickHouse-Extras/hive-metastore

View File

@ -247,8 +247,6 @@ endif()
if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
set(USE_DEBUG_HELPERS ON)
else ()
set(USE_DEBUG_HELPERS ON)
endif()
option(USE_DEBUG_HELPERS "Enable debug helpers" ${USE_DEBUG_HELPERS})

View File

@ -1,59 +1,80 @@
#include <sys/auxv.h>
#include "atomic.h"
#include <unistd.h> // __environ
#include <sys/auxv.h>
#include <fcntl.h> // open
#include <sys/stat.h> // O_RDONLY
#include <unistd.h> // read, close
#include <stdlib.h> // ssize_t
#include <stdio.h> // perror, fprintf
#include <link.h> // ElfW
#include <errno.h>
// We don't have libc struct available here. Compute aux vector manually.
static unsigned long * __auxv = NULL;
static unsigned long __auxv_secure = 0;
#define ARRAY_SIZE(a) sizeof((a))/sizeof((a[0]))
static size_t __find_auxv(unsigned long type)
{
size_t i;
for (i = 0; __auxv[i]; i += 2)
{
if (__auxv[i] == type)
return i + 1;
}
return (size_t) -1;
}
// We don't have libc struct available here.
// Compute aux vector manually (from /proc/self/auxv).
//
// Right now there is only 51 AT_* constants,
// so 64 should be enough until this implementation will be replaced with musl.
static unsigned long __auxv[64];
static unsigned long __auxv_secure = 0;
unsigned long __getauxval(unsigned long type)
{
if (type == AT_SECURE)
return __auxv_secure;
if (__auxv)
if (type >= ARRAY_SIZE(__auxv))
{
size_t index = __find_auxv(type);
if (index != ((size_t) -1))
return __auxv[index];
}
errno = ENOENT;
return 0;
}
static void * volatile getauxval_func;
static unsigned long __auxv_init(unsigned long type)
{
if (!__environ)
{
// __environ is not initialized yet so we can't initialize __auxv right now.
// That's normally occurred only when getauxval() is called from some sanitizer's internal code.
errno = ENOENT;
return 0;
}
// Initialize __auxv and __auxv_secure.
size_t i;
for (i = 0; __environ[i]; i++);
__auxv = (unsigned long *) (__environ + i + 1);
return __auxv[type];
}
size_t secure_idx = __find_auxv(AT_SECURE);
if (secure_idx != ((size_t) -1))
__auxv_secure = __auxv[secure_idx];
static void * volatile getauxval_func;
ssize_t __retry_read(int fd, void *buf, size_t count)
{
for (;;)
{
ssize_t ret = read(fd, buf, count);
if (ret == -1)
{
if (errno == EINTR)
continue;
perror("Cannot read /proc/self/auxv");
abort();
}
return ret;
}
}
static unsigned long __auxv_init(unsigned long type)
{
// od -t dL /proc/self/auxv
int fd = open("/proc/self/auxv", O_RDONLY);
if (fd == -1) {
perror("Cannot read /proc/self/auxv (likely kernel is too old or procfs is not mounted)");
abort();
}
ElfW(auxv_t) aux;
/// NOTE: sizeof(aux) is very small (less then PAGE_SIZE), so partial read should not be possible.
_Static_assert(sizeof(aux) < 4096, "Unexpected sizeof(aux)");
while (__retry_read(fd, &aux, sizeof(aux)) == sizeof(aux))
{
if (aux.a_type >= ARRAY_SIZE(__auxv))
{
fprintf(stderr, "AT_* is out of range: %li (maximum allowed is %zu)\n", aux.a_type, ARRAY_SIZE(__auxv));
abort();
}
__auxv[aux.a_type] = aux.a_un.a_val;
}
close(fd);
// AT_SECURE
__auxv_secure = __getauxval(AT_SECURE);
// Now we've initialized __auxv, next time getauxval() will only call __get_auxval().
a_cas_p(&getauxval_func, (void *)__auxv_init, (void *)__getauxval);

View File

@ -140,6 +140,8 @@ if (ENABLE_NLP)
add_contrib (libstemmer-c-cmake libstemmer_c)
add_contrib (wordnet-blast-cmake wordnet-blast)
add_contrib (lemmagen-c-cmake lemmagen-c)
add_contrib (nlp-data-cmake nlp-data)
add_contrib (cld2-cmake cld2)
endif()
add_contrib (sqlite-cmake sqlite-amalgamation)

1
contrib/cld2 vendored Submodule

@ -0,0 +1 @@
Subproject commit bc6d493a2f64ed1fc1c4c4b4294a542a04e04217

View File

@ -0,0 +1,33 @@
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/cld2")
set (SRCS
"${LIBRARY_DIR}/internal/cldutil.cc"
"${LIBRARY_DIR}/internal/compact_lang_det.cc"
"${LIBRARY_DIR}/internal/cldutil_shared.cc"
"${LIBRARY_DIR}/internal/compact_lang_det_hint_code.cc"
"${LIBRARY_DIR}/internal/compact_lang_det_impl.cc"
"${LIBRARY_DIR}/internal/debug.cc"
"${LIBRARY_DIR}/internal/fixunicodevalue.cc"
"${LIBRARY_DIR}/internal/generated_entities.cc"
"${LIBRARY_DIR}/internal/generated_language.cc"
"${LIBRARY_DIR}/internal/generated_ulscript.cc"
"${LIBRARY_DIR}/internal/getonescriptspan.cc"
"${LIBRARY_DIR}/internal/lang_script.cc"
"${LIBRARY_DIR}/internal/offsetmap.cc"
"${LIBRARY_DIR}/internal/scoreonescriptspan.cc"
"${LIBRARY_DIR}/internal/tote.cc"
"${LIBRARY_DIR}/internal/utf8statetable.cc"
"${LIBRARY_DIR}/internal/cld_generated_cjk_uni_prop_80.cc"
"${LIBRARY_DIR}/internal/cld2_generated_cjk_compatible.cc"
"${LIBRARY_DIR}/internal/cld_generated_cjk_delta_bi_4.cc"
"${LIBRARY_DIR}/internal/generated_distinct_bi_0.cc"
"${LIBRARY_DIR}/internal/cld2_generated_quadchrome_2.cc"
"${LIBRARY_DIR}/internal/cld2_generated_deltaoctachrome.cc"
"${LIBRARY_DIR}/internal/cld2_generated_distinctoctachrome.cc"
"${LIBRARY_DIR}/internal/cld_generated_score_quad_octa_2.cc"
)
add_library(_cld2 ${SRCS})
set_property(TARGET _cld2 PROPERTY POSITION_INDEPENDENT_CODE ON)
target_compile_options (_cld2 PRIVATE -Wno-reserved-id-macro -Wno-c++11-narrowing)
target_include_directories(_cld2 SYSTEM BEFORE PUBLIC "${LIBRARY_DIR}/public")
add_library(ch_contrib::cld2 ALIAS _cld2)

1
contrib/nlp-data vendored Submodule

@ -0,0 +1 @@
Subproject commit 5591f91f5e748cba8fb9ef81564176feae774853

View File

@ -0,0 +1,15 @@
include(${ClickHouse_SOURCE_DIR}/cmake/embed_binary.cmake)
set(LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/nlp-data")
add_library (_nlp_data INTERFACE)
clickhouse_embed_binaries(
TARGET nlp_dictionaries
RESOURCE_DIR "${LIBRARY_DIR}"
RESOURCES charset.zst tonality_ru.zst programming.zst
)
add_dependencies(_nlp_data nlp_dictionaries)
target_link_libraries(_nlp_data INTERFACE "-Wl,${WHOLE_ARCHIVE} $<TARGET_FILE:nlp_dictionaries> -Wl,${NO_WHOLE_ARCHIVE}")
add_library(ch_contrib::nlp_data ALIAS _nlp_data)

View File

@ -27,6 +27,7 @@ toc_title: Client Libraries
- Go
- [clickhouse](https://github.com/kshvakov/clickhouse/)
- [go-clickhouse](https://github.com/roistat/go-clickhouse)
- [chconn](https://github.com/vahid-sohrabloo/chconn)
- [mailrugo-clickhouse](https://github.com/mailru/go-clickhouse)
- [golang-clickhouse](https://github.com/leprosus/golang-clickhouse)
- Swift

View File

@ -17,15 +17,11 @@ class AggregateFunctionSimpleState final : public IAggregateFunctionHelper<Aggre
{
private:
AggregateFunctionPtr nested_func;
DataTypes arguments;
Array params;
public:
AggregateFunctionSimpleState(AggregateFunctionPtr nested_, const DataTypes & arguments_, const Array & params_)
: IAggregateFunctionHelper<AggregateFunctionSimpleState>(arguments_, params_)
, nested_func(nested_)
, arguments(arguments_)
, params(params_)
{
}
@ -35,18 +31,19 @@ public:
{
DataTypeCustomSimpleAggregateFunction::checkSupportedFunctions(nested_func);
// Need to make a clone because it'll be customized.
auto storage_type = DataTypeFactory::instance().get(nested_func->getReturnType()->getName());
// Need to make a clone to avoid recursive reference.
auto storage_type_out = DataTypeFactory::instance().get(nested_func->getReturnType()->getName());
// Need to make a new function with promoted argument types because SimpleAggregates requires arg_type = return_type.
AggregateFunctionProperties properties;
auto function
= AggregateFunctionFactory::instance().get(nested_func->getName(), {storage_type}, nested_func->getParameters(), properties);
= AggregateFunctionFactory::instance().get(nested_func->getName(), {storage_type_out}, nested_func->getParameters(), properties);
// Need to make a clone because it'll be customized.
auto storage_type_arg = DataTypeFactory::instance().get(nested_func->getReturnType()->getName());
DataTypeCustomNamePtr custom_name
= std::make_unique<DataTypeCustomSimpleAggregateFunction>(function, DataTypes{nested_func->getReturnType()}, params);
storage_type->setCustomization(std::make_unique<DataTypeCustomDesc>(std::move(custom_name), nullptr));
return storage_type;
= std::make_unique<DataTypeCustomSimpleAggregateFunction>(function, DataTypes{nested_func->getReturnType()}, parameters);
storage_type_arg->setCustomization(std::make_unique<DataTypeCustomDesc>(std::move(custom_name), nullptr));
return storage_type_arg;
}
bool isVersioned() const override

View File

@ -20,13 +20,12 @@ class AggregateFunctionState final : public IAggregateFunctionHelper<AggregateFu
{
private:
AggregateFunctionPtr nested_func;
DataTypes arguments;
Array params;
public:
AggregateFunctionState(AggregateFunctionPtr nested_, const DataTypes & arguments_, const Array & params_)
: IAggregateFunctionHelper<AggregateFunctionState>(arguments_, params_)
, nested_func(nested_), arguments(arguments_), params(params_) {}
, nested_func(nested_)
{}
String getName() const override
{

View File

@ -506,6 +506,7 @@ if (ENABLE_NLP)
dbms_target_link_libraries (PUBLIC ch_contrib::stemmer)
dbms_target_link_libraries (PUBLIC ch_contrib::wnb)
dbms_target_link_libraries (PUBLIC ch_contrib::lemmagen)
dbms_target_link_libraries (PUBLIC ch_contrib::nlp_data)
endif()
if (TARGET ch_contrib::bzip2)
@ -558,3 +559,4 @@ if (ENABLE_TESTS)
add_check(unit_tests_dbms)
endif ()

View File

@ -0,0 +1,252 @@
#pragma once
#include <Common/Arena.h>
#include <Common/getResource.h>
#include <Common/HashTable/HashMap.h>
#include <Common/StringUtils/StringUtils.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/readFloatText.h>
#include <IO/ZstdInflatingReadBuffer.h>
#include <base/StringRef.h>
#include <base/logger_useful.h>
#include <string_view>
#include <unordered_map>
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
}
/// FrequencyHolder class is responsible for storing and loading dictionaries
/// needed for text classification functions:
///
/// 1. detectLanguageUnknown
/// 2. detectCharset
/// 3. detectTonality
/// 4. detectProgrammingLanguage
class FrequencyHolder
{
public:
struct Language
{
String name;
HashMap<StringRef, Float64> map;
};
struct Encoding
{
String name;
String lang;
HashMap<UInt16, Float64> map;
};
public:
using Map = HashMap<StringRef, Float64>;
using Container = std::vector<Language>;
using EncodingMap = HashMap<UInt16, Float64>;
using EncodingContainer = std::vector<Encoding>;
static FrequencyHolder & getInstance()
{
static FrequencyHolder instance;
return instance;
}
void loadEncodingsFrequency()
{
Poco::Logger * log = &Poco::Logger::get("EncodingsFrequency");
LOG_TRACE(log, "Loading embedded charset frequencies");
auto resource = getResource("charset.zst");
if (resource.empty())
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "There is no embedded charset frequencies");
String line;
UInt16 bigram;
Float64 frequency;
String charset_name;
auto buf = std::make_unique<ReadBufferFromMemory>(resource.data(), resource.size());
ZstdInflatingReadBuffer in(std::move(buf));
while (!in.eof())
{
readString(line, in);
in.ignore();
if (line.empty())
continue;
ReadBufferFromString buf_line(line);
// Start loading a new charset
if (line.starts_with("// "))
{
// Skip "// "
buf_line.ignore(3);
readString(charset_name, buf_line);
/* In our dictionary we have lines with form: <Language>_<Charset>
* If we need to find language of data, we return <Language>
* If we need to find charset of data, we return <Charset>.
*/
size_t sep = charset_name.find('_');
Encoding enc;
enc.lang = charset_name.substr(0, sep);
enc.name = charset_name.substr(sep + 1);
encodings_freq.push_back(std::move(enc));
}
else
{
readIntText(bigram, buf_line);
buf_line.ignore();
readFloatText(frequency, buf_line);
encodings_freq.back().map[bigram] = frequency;
}
}
LOG_TRACE(log, "Charset frequencies was added, charsets count: {}", encodings_freq.size());
}
void loadEmotionalDict()
{
Poco::Logger * log = &Poco::Logger::get("EmotionalDict");
LOG_TRACE(log, "Loading embedded emotional dictionary");
auto resource = getResource("tonality_ru.zst");
if (resource.empty())
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "There is no embedded emotional dictionary");
String line;
String word;
Float64 tonality;
size_t count = 0;
auto buf = std::make_unique<ReadBufferFromMemory>(resource.data(), resource.size());
ZstdInflatingReadBuffer in(std::move(buf));
while (!in.eof())
{
readString(line, in);
in.ignore();
if (line.empty())
continue;
ReadBufferFromString buf_line(line);
readStringUntilWhitespace(word, buf_line);
buf_line.ignore();
readFloatText(tonality, buf_line);
StringRef ref{string_pool.insert(word.data(), word.size()), word.size()};
emotional_dict[ref] = tonality;
++count;
}
LOG_TRACE(log, "Emotional dictionary was added. Word count: {}", std::to_string(count));
}
void loadProgrammingFrequency()
{
Poco::Logger * log = &Poco::Logger::get("ProgrammingFrequency");
LOG_TRACE(log, "Loading embedded programming languages frequencies loading");
auto resource = getResource("programming.zst");
if (resource.empty())
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "There is no embedded programming languages frequencies");
String line;
String bigram;
Float64 frequency;
String programming_language;
auto buf = std::make_unique<ReadBufferFromMemory>(resource.data(), resource.size());
ZstdInflatingReadBuffer in(std::move(buf));
while (!in.eof())
{
readString(line, in);
in.ignore();
if (line.empty())
continue;
ReadBufferFromString buf_line(line);
// Start loading a new language
if (line.starts_with("// "))
{
// Skip "// "
buf_line.ignore(3);
readString(programming_language, buf_line);
Language lang;
lang.name = programming_language;
programming_freq.push_back(std::move(lang));
}
else
{
readStringUntilWhitespace(bigram, buf_line);
buf_line.ignore();
readFloatText(frequency, buf_line);
StringRef ref{string_pool.insert(bigram.data(), bigram.size()), bigram.size()};
programming_freq.back().map[ref] = frequency;
}
}
LOG_TRACE(log, "Programming languages frequencies was added");
}
const Map & getEmotionalDict()
{
std::lock_guard lock(mutex);
if (emotional_dict.empty())
loadEmotionalDict();
return emotional_dict;
}
const EncodingContainer & getEncodingsFrequency()
{
std::lock_guard lock(mutex);
if (encodings_freq.empty())
loadEncodingsFrequency();
return encodings_freq;
}
const Container & getProgrammingFrequency()
{
std::lock_guard lock(mutex);
if (programming_freq.empty())
loadProgrammingFrequency();
return programming_freq;
}
private:
Arena string_pool;
Map emotional_dict;
Container programming_freq;
EncodingContainer encodings_freq;
std::mutex mutex;
};
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <cstdint>
#include <Common/VariableContext.h>
/// To be able to avoid MEMORY_LIMIT_EXCEEDED Exception in destructors:

View File

@ -1,5 +1,6 @@
#pragma once
#include <cstdint>
#include <Common/VariableContext.h>
/// To be able to temporarily stop memory tracking from current thread.

View File

@ -24,7 +24,6 @@ namespace DB
namespace ErrorCodes
{
extern const int UNSUPPORTED_PARAMETER;
extern const int BAD_ARGUMENTS;
}
@ -34,9 +33,12 @@ namespace ErrorCodes
*/
struct StringSearcherBase
class StringSearcherBase
{
public:
bool force_fallback = false;
#ifdef __SSE2__
protected:
static constexpr auto n = sizeof(__m128i);
const int page_size = ::getPageSize();
@ -53,7 +55,7 @@ template <bool CaseSensitive, bool ASCII> class StringSearcher;
/// Case-insensitive UTF-8 searcher
template <>
class StringSearcher<false, false> : private StringSearcherBase
class StringSearcher<false, false> : public StringSearcherBase
{
private:
using UTF8SequenceBuffer = uint8_t[6];
@ -119,11 +121,14 @@ public:
size_t length_u = UTF8::convertCodePointToUTF8(first_u_u32, u_seq, sizeof(u_seq));
if (length_l != length_u)
throw Exception{"UTF8 sequences with different lowercase and uppercase lengths are not supported", ErrorCodes::UNSUPPORTED_PARAMETER};
force_fallback = true;
}
l = l_seq[0];
u = u_seq[0];
if (force_fallback)
return;
}
#ifdef __SSE4_1__
@ -158,7 +163,10 @@ public:
/// @note Unicode standard states it is a rare but possible occasion
if (!(dst_l_len == dst_u_len && dst_u_len == src_len))
throw Exception{"UTF8 sequences with different lowercase and uppercase lengths are not supported", ErrorCodes::UNSUPPORTED_PARAMETER};
{
force_fallback = true;
return;
}
}
cache_actual_len += src_len;
@ -199,9 +207,10 @@ public:
if (Poco::Unicode::toLower(*haystack_code_point) != Poco::Unicode::toLower(*needle_code_point))
break;
/// @note assuming sequences for lowercase and uppercase have exact same length (that is not always true)
const auto len = UTF8::seqLength(*haystack_pos);
auto len = UTF8::seqLength(*haystack_pos);
haystack_pos += len;
len = UTF8::seqLength(*needle_pos);
needle_pos += len;
}
@ -213,7 +222,7 @@ public:
{
#ifdef __SSE4_1__
if (pageSafe(pos))
if (pageSafe(pos) && !force_fallback)
{
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(pos));
const auto v_against_l = _mm_cmpeq_epi8(v_haystack, cachel);
@ -262,7 +271,7 @@ public:
while (haystack < haystack_end)
{
#ifdef __SSE4_1__
if (haystack + n <= haystack_end && pageSafe(haystack))
if (haystack + n <= haystack_end && pageSafe(haystack) && !force_fallback)
{
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(haystack));
const auto v_against_l = _mm_cmpeq_epi8(v_haystack, patl);
@ -339,7 +348,7 @@ public:
/// Case-insensitive ASCII searcher
template <>
class StringSearcher<false, true> : private StringSearcherBase
class StringSearcher<false, true> : public StringSearcherBase
{
private:
/// string to be searched for
@ -541,7 +550,7 @@ public:
/// Case-sensitive searcher (both ASCII and UTF-8)
template <bool ASCII>
class StringSearcher<true, ASCII> : private StringSearcherBase
class StringSearcher<true, ASCII> : public StringSearcherBase
{
private:
/// string to be searched for
@ -725,7 +734,7 @@ public:
// Any value outside of basic ASCII (>=128) is considered a non-separator symbol, hence UTF-8 strings
// should work just fine. But any Unicode whitespace is not considered a token separtor.
template <typename StringSearcher>
class TokenSearcher
class TokenSearcher : public StringSearcherBase
{
StringSearcher searcher;
size_t needle_size;
@ -809,7 +818,7 @@ using ASCIICaseInsensitiveTokenSearcher = TokenSearcher<ASCIICaseInsensitiveStri
* It is required that strings are zero-terminated.
*/
struct LibCASCIICaseSensitiveStringSearcher
struct LibCASCIICaseSensitiveStringSearcher : public StringSearcherBase
{
const char * const needle;
@ -833,7 +842,7 @@ struct LibCASCIICaseSensitiveStringSearcher
}
};
struct LibCASCIICaseInsensitiveStringSearcher
struct LibCASCIICaseInsensitiveStringSearcher : public StringSearcherBase
{
const char * const needle;

View File

@ -0,0 +1,177 @@
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/CrashLog.h>
#include <Interpreters/MetricLog.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/QueryViewsLog.h>
#include <Interpreters/SessionLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/SystemLogBase.h>
#include <base/logger_useful.h>
#include <base/scope_guard.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
}
namespace
{
constexpr size_t DBMS_SYSTEM_LOG_QUEUE_SIZE = 1048576;
}
void ISystemLog::stopFlushThread()
{
{
std::lock_guard lock(mutex);
if (!saving_thread.joinable())
{
return;
}
if (is_shutdown)
{
return;
}
is_shutdown = true;
/// Tell thread to shutdown.
flush_event.notify_all();
}
saving_thread.join();
}
void ISystemLog::startup()
{
std::lock_guard lock(mutex);
saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); });
}
static thread_local bool recursive_add_call = false;
template <typename LogElement>
void SystemLogBase<LogElement>::add(const LogElement & element)
{
/// It is possible that the method will be called recursively.
/// Better to drop these events to avoid complications.
if (recursive_add_call)
return;
recursive_add_call = true;
SCOPE_EXIT({ recursive_add_call = false; });
/// Memory can be allocated while resizing on queue.push_back.
/// The size of allocation can be in order of a few megabytes.
/// But this should not be accounted for query memory usage.
/// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky.
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
/// Should not log messages under mutex.
bool queue_is_half_full = false;
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
if (queue.size() == DBMS_SYSTEM_LOG_QUEUE_SIZE / 2)
{
queue_is_half_full = true;
// The queue more than half full, time to flush.
// We only check for strict equality, because messages are added one
// by one, under exclusive lock, so we will see each message count.
// It is enough to only wake the flushing thread once, after the message
// count increases past half available size.
const uint64_t queue_end = queue_front_index + queue.size();
if (requested_flush_up_to < queue_end)
requested_flush_up_to = queue_end;
flush_event.notify_all();
}
if (queue.size() >= DBMS_SYSTEM_LOG_QUEUE_SIZE)
{
// Ignore all further entries until the queue is flushed.
// Log a message about that. Don't spam it -- this might be especially
// problematic in case of trace log. Remember what the front index of the
// queue was when we last logged the message. If it changed, it means the
// queue was flushed, and we can log again.
if (queue_front_index != logged_queue_full_at_index)
{
logged_queue_full_at_index = queue_front_index;
// TextLog sets its logger level to 0, so this log is a noop and
// there is no recursive logging.
lock.unlock();
LOG_ERROR(log, "Queue is full for system log '{}' at {}", demangle(typeid(*this).name()), queue_front_index);
}
return;
}
queue.push_back(element);
}
if (queue_is_half_full)
LOG_INFO(log, "Queue is half full for system log '{}'.", demangle(typeid(*this).name()));
}
template <typename LogElement>
void SystemLogBase<LogElement>::flush(bool force)
{
uint64_t this_thread_requested_offset;
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
this_thread_requested_offset = queue_front_index + queue.size();
// Publish our flush request, taking care not to overwrite the requests
// made by other threads.
is_force_prepare_tables |= force;
requested_flush_up_to = std::max(requested_flush_up_to, this_thread_requested_offset);
flush_event.notify_all();
}
LOG_DEBUG(log, "Requested flush up to offset {}", this_thread_requested_offset);
// Use an arbitrary timeout to avoid endless waiting. 60s proved to be
// too fast for our parallel functional tests, probably because they
// heavily load the disk.
const int timeout_seconds = 180;
std::unique_lock lock(mutex);
bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds), [&]
{
return flushed_up_to >= this_thread_requested_offset && !is_force_prepare_tables;
});
if (!result)
{
throw Exception(
"Timeout exceeded (" + toString(timeout_seconds) + " s) while flushing system log '" + demangle(typeid(*this).name()) + "'.",
ErrorCodes::TIMEOUT_EXCEEDED);
}
}
#define INSTANTIATE_SYSTEM_LOG_BASE(ELEMENT) template class SystemLogBase<ELEMENT>;
SYSTEM_LOG_ELEMENTS(INSTANTIATE_SYSTEM_LOG_BASE)
}

109
src/Common/SystemLogBase.h Normal file
View File

@ -0,0 +1,109 @@
#pragma once
#include <atomic>
#include <condition_variable>
#include <memory>
#include <thread>
#include <vector>
#include <base/types.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Common/ThreadPool.h>
#define SYSTEM_LOG_ELEMENTS(M) \
M(AsynchronousMetricLogElement) \
M(CrashLogElement) \
M(MetricLogElement) \
M(OpenTelemetrySpanLogElement) \
M(PartLogElement) \
M(QueryLogElement) \
M(QueryThreadLogElement) \
M(QueryViewsLogElement) \
M(SessionLogElement) \
M(TraceLogElement) \
M(ZooKeeperLogElement) \
M(TextLogElement)
namespace Poco
{
class Logger;
namespace Util
{
class AbstractConfiguration;
}
}
namespace DB
{
struct StorageID;
class ISystemLog
{
public:
virtual String getName() = 0;
//// force -- force table creation (used for SYSTEM FLUSH LOGS)
virtual void flush(bool force = false) = 0;
virtual void prepareTable() = 0;
/// Start the background thread.
virtual void startup();
/// Stop the background flush thread before destructor. No more data will be written.
virtual void shutdown() = 0;
virtual ~ISystemLog() = default;
virtual void savingThreadFunction() = 0;
protected:
ThreadFromGlobalPool saving_thread;
/// Data shared between callers of add()/flush()/shutdown(), and the saving thread
std::mutex mutex;
bool is_shutdown = false;
std::condition_variable flush_event;
void stopFlushThread();
};
template <typename LogElement>
class SystemLogBase : public ISystemLog
{
public:
using Self = SystemLogBase;
/** Append a record into log.
* Writing to table will be done asynchronously and in case of failure, record could be lost.
*/
void add(const LogElement & element);
/// Flush data in the buffer to disk
void flush(bool force) override;
String getName() override { return LogElement::name(); }
protected:
Poco::Logger * log;
// Queue is bounded. But its size is quite large to not block in all normal cases.
std::vector<LogElement> queue;
// An always-incrementing index of the first message currently in the queue.
// We use it to give a global sequential index to every message, so that we
// can wait until a particular message is flushed. This is used to implement
// synchronous log flushing for SYSTEM FLUSH LOGS.
uint64_t queue_front_index = 0;
// A flag that says we must create the tables even if the queue is empty.
bool is_force_prepare_tables = false;
// Requested to flush logs up to this index, exclusive
uint64_t requested_flush_up_to = 0;
// Flushed log up to this index, exclusive
uint64_t flushed_up_to = 0;
// Logged overflow message at this queue front index
uint64_t logged_queue_full_at_index = -1;
};
}

View File

@ -372,7 +372,7 @@ public:
, fallback{VolnitskyTraits::isFallbackNeedle(needle_size, haystack_size_hint)}
, fallback_searcher{needle_, needle_size}
{
if (fallback)
if (fallback || fallback_searcher.force_fallback)
return;
hash = std::unique_ptr<VolnitskyTraits::Offset[]>(new VolnitskyTraits::Offset[VolnitskyTraits::hash_size]{});
@ -393,7 +393,7 @@ public:
const auto haystack_end = haystack + haystack_size;
if (fallback || haystack_size <= needle_size)
if (fallback || haystack_size <= needle_size || fallback_searcher.force_fallback)
return fallback_searcher.search(haystack, haystack_end);
/// Let's "apply" the needle to the haystack and compare the n-gram from the end of the needle.

View File

@ -3,12 +3,7 @@ include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
add_headers_and_sources(clickhouse_common_zookeeper .)
# for clickhouse server
#
# NOTE: this library depends from Interpreters (DB::SystemLog<DB::ZooKeeperLogElement>::add),
# and so it should be STATIC because otherwise:
# - it will either fail to compile with -Wl,--unresolved-symbols=report-all
# - or it will report errors at runtime
add_library(clickhouse_common_zookeeper STATIC ${clickhouse_common_zookeeper_headers} ${clickhouse_common_zookeeper_sources})
add_library(clickhouse_common_zookeeper ${clickhouse_common_zookeeper_headers} ${clickhouse_common_zookeeper_sources})
target_compile_definitions (clickhouse_common_zookeeper PRIVATE -DZOOKEEPER_LOG)
target_link_libraries (clickhouse_common_zookeeper
PUBLIC

View File

@ -32,6 +32,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SIZES_OF_COLUMNS_IN_TUPLE_DOESNT_MATCH;
extern const int ILLEGAL_INDEX;
extern const int LOGICAL_ERROR;
}
@ -156,8 +157,19 @@ MutableColumnPtr DataTypeTuple::createColumn() const
MutableColumnPtr DataTypeTuple::createColumn(const ISerialization & serialization) const
{
const auto & element_serializations =
assert_cast<const SerializationTuple &>(serialization).getElementsSerializations();
/// If we read subcolumn of nested Tuple, it may be wrapped to SerializationNamed
/// several times to allow to reconstruct the substream path name.
/// Here we don't need substream path name, so we drop first several wrapper serializations.
const auto * current_serialization = &serialization;
while (const auto * serialization_named = typeid_cast<const SerializationNamed *>(current_serialization))
current_serialization = serialization_named->getNested().get();
const auto * serialization_tuple = typeid_cast<const SerializationTuple *>(current_serialization);
if (!serialization_tuple)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected serialization to create column of type Tuple");
const auto & element_serializations = serialization_tuple->getElementsSerializations();
size_t size = elems.size();
assert(element_serializations.size() == size);

View File

@ -5,6 +5,11 @@
namespace DB
{
/// Serialization wrapper that acts like nested serialization,
/// but adds a passed name to the substream path like the
/// read column was the tuple element with this name.
/// It's used while reading subcolumns of complex types.
/// In particular while reading components of named tuples.
class SerializationNamed final : public SerializationWrapper
{
private:

View File

@ -76,6 +76,10 @@ endif()
target_link_libraries(clickhouse_functions PRIVATE ch_contrib::lz4)
if (ENABLE_NLP)
target_link_libraries(clickhouse_functions PRIVATE ch_contrib::cld2)
endif()
if (TARGET ch_contrib::h3)
target_link_libraries (clickhouse_functions PRIVATE ch_contrib::h3)
endif()

View File

@ -18,6 +18,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int DECIMAL_OVERFLOW;
extern const int ILLEGAL_COLUMN;
}
/// Cast DateTime64 to Int64 representation narrowed down (or scaled up) to any scale value defined in Impl.
@ -108,8 +109,8 @@ public:
if (arguments.size() < 1 || arguments.size() > 2)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} takes one or two arguments", name);
if (!typeid_cast<const DataTypeInt64 *>(arguments[0].type.get()))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The first argument for function {} must be Int64", name);
if (!isInteger(arguments[0].type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The first argument for function {} must be integer", name);
std::string timezone;
if (arguments.size() == 2)
@ -118,21 +119,48 @@ public:
return std::make_shared<DataTypeDateTime64>(target_scale, timezone);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
template <typename T>
bool executeType(auto & result_column, const ColumnsWithTypeAndName & arguments, size_t input_rows_count) const
{
const auto & src = arguments[0];
const auto & col = *src.column;
auto res_column = ColumnDecimal<DateTime64>::create(input_rows_count, target_scale);
auto & result_data = res_column->getData();
if (!checkAndGetColumn<ColumnVector<T>>(col))
return 0;
const auto & source_data = typeid_cast<const ColumnInt64 &>(col).getData();
auto & result_data = result_column->getData();
const auto & source_data = typeid_cast<const ColumnVector<T> &>(col).getData();
for (size_t i = 0; i < input_rows_count; ++i)
result_data[i] = source_data[i];
return res_column;
return 1;
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
auto result_column = ColumnDecimal<DateTime64>::create(input_rows_count, target_scale);
if (!((executeType<UInt8>(result_column, arguments, input_rows_count))
|| (executeType<UInt16>(result_column, arguments, input_rows_count))
|| (executeType<UInt32>(result_column, arguments, input_rows_count))
|| (executeType<UInt32>(result_column, arguments, input_rows_count))
|| (executeType<UInt64>(result_column, arguments, input_rows_count))
|| (executeType<Int8>(result_column, arguments, input_rows_count))
|| (executeType<Int16>(result_column, arguments, input_rows_count))
|| (executeType<Int32>(result_column, arguments, input_rows_count))
|| (executeType<Int64>(result_column, arguments, input_rows_count))))
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of first argument of function {}",
arguments[0].column->getName(),
getName());
}
return result_column;
}
};
}

View File

@ -0,0 +1,142 @@
#include <Common/FrequencyHolder.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsTextClassification.h>
#include <memory>
#include <unordered_map>
namespace DB
{
/* Determine language and charset of text data. For each text, we build the distribution of bigrams bytes.
* Then we use marked-up dictionaries with distributions of bigram bytes of various languages and charsets.
* Using a naive Bayesian classifier, find the most likely charset and language and return it
*/
template <bool detect_language>
struct CharsetClassificationImpl
{
/* We need to solve zero-frequency problem for Naive Bayes Classifier
* If the bigram is not found in the text, we assume that the probability of its meeting is 1e-06.
* 1e-06 is minimal value in our marked-up dictionary.
*/
static constexpr Float64 zero_frequency = 1e-06;
/// If the data size is bigger than this, behaviour is unspecified for this function.
static constexpr size_t max_string_size = 1u << 15;
static ALWAYS_INLINE inline Float64 naiveBayes(
const FrequencyHolder::EncodingMap & standard,
const HashMap<UInt16, UInt64> & model,
Float64 max_result)
{
Float64 res = 0;
for (const auto & el : model)
{
/// Try to find bigram in the dictionary.
const auto * it = standard.find(el.getKey());
if (it != standard.end())
{
res += el.getMapped() * log(it->getMapped());
} else
{
res += el.getMapped() * log(zero_frequency);
}
/// If at some step the result has become less than the current maximum, then it makes no sense to count it fully.
if (res < max_result)
{
return res;
}
}
return res;
}
/// Сount how many times each bigram occurs in the text.
static ALWAYS_INLINE inline void calculateStats(
const UInt8 * data,
const size_t size,
HashMap<UInt16, UInt64> & model)
{
UInt16 hash = 0;
for (size_t i = 0; i < size; ++i)
{
hash <<= 8;
hash += *(data + i);
++model[hash];
}
}
static void vector(
const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets)
{
const auto & encodings_freq = FrequencyHolder::getInstance().getEncodingsFrequency();
if (detect_language)
/// 2 chars for ISO code + 1 zero byte
res_data.reserve(offsets.size() * 3);
else
/// Mean charset length is 8
res_data.reserve(offsets.size() * 8);
res_offsets.resize(offsets.size());
size_t res_offset = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
const UInt8 * str = data.data() + offsets[i - 1];
const size_t str_len = offsets[i] - offsets[i - 1] - 1;
std::string_view res;
HashMap<UInt16, UInt64> model;
calculateStats(str, str_len, model);
/// Go through the dictionary and find the charset with the highest weight
Float64 max_result = log(zero_frequency) * (max_string_size);
for (const auto & item : encodings_freq)
{
Float64 score = naiveBayes(item.map, model, max_result);
if (max_result < score)
{
max_result = score;
res = detect_language ? item.lang : item.name;
}
}
res_data.resize(res_offset + res.size() + 1);
memcpy(&res_data[res_offset], res.data(), res.size());
res_data[res_offset + res.size()] = 0;
res_offset += res.size() + 1;
res_offsets[i] = res_offset;
}
}
};
struct NameDetectCharset
{
static constexpr auto name = "detectCharset";
};
struct NameDetectLanguageUnknown
{
static constexpr auto name = "detectLanguageUnknown";
};
using FunctionDetectCharset = FunctionTextClassificationString<CharsetClassificationImpl<false>, NameDetectCharset>;
using FunctionDetectLanguageUnknown = FunctionTextClassificationString<CharsetClassificationImpl<true>, NameDetectLanguageUnknown>;
void registerFunctionDetectCharset(FunctionFactory & factory)
{
factory.registerFunction<FunctionDetectCharset>();
factory.registerFunction<FunctionDetectLanguageUnknown>();
}
}

View File

@ -0,0 +1,231 @@
#include "config_functions.h"
#if USE_NLP
#include <Columns/ColumnMap.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Common/isValidUTF8.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsTextClassification.h>
#include <Interpreters/Context.h>
#include <compact_lang_det.h>
namespace DB
{
/* Determine language of Unicode UTF-8 text.
* Uses the cld2 library https://github.com/CLD2Owners/cld2
*/
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_COLUMN;
extern const int SUPPORT_IS_DISABLED;
}
struct FunctionDetectLanguageImpl
{
static ALWAYS_INLINE inline std::string_view codeISO(std::string_view code_string)
{
if (code_string.ends_with("-Latn"))
code_string.remove_suffix(code_string.size() - 5);
if (code_string.ends_with("-Hant"))
code_string.remove_suffix(code_string.size() - 5);
// Old deprecated codes
if (code_string == "iw")
return "he";
if (code_string == "jw")
return "jv";
if (code_string == "in")
return "id";
if (code_string == "mo")
return "ro";
// Some languages do not have 2 letter codes, for example code for Cebuano is ceb
if (code_string.size() != 2)
return "other";
return code_string;
}
static void vector(
const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets)
{
/// Constant 3 is based on the fact that in general we need 2 characters for ISO code + 1 zero byte
res_data.reserve(offsets.size() * 3);
res_offsets.resize(offsets.size());
bool is_reliable;
size_t res_offset = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
const UInt8 * str = data.data() + offsets[i - 1];
const size_t str_len = offsets[i] - offsets[i - 1] - 1;
std::string_view res;
if (UTF8::isValidUTF8(str, str_len))
{
auto lang = CLD2::DetectLanguage(reinterpret_cast<const char *>(str), str_len, true, &is_reliable);
res = codeISO(LanguageCode(lang));
}
else
{
res = "un";
}
res_data.resize(res_offset + res.size() + 1);
memcpy(&res_data[res_offset], res.data(), res.size());
res_data[res_offset + res.size()] = 0;
res_offset += res.size() + 1;
res_offsets[i] = res_offset;
}
}
};
class FunctionDetectLanguageMixed : public IFunction
{
public:
static constexpr auto name = "detectLanguageMixed";
/// Number of top results
static constexpr auto top_N = 3;
static FunctionPtr create(ContextPtr context)
{
if (!context->getSettingsRef().allow_experimental_nlp_functions)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Natural language processing function '{}' is experimental. Set `allow_experimental_nlp_functions` setting to enable it", name);
return std::make_shared<FunctionDetectLanguageMixed>();
}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument of function {}. Must be String.",
arguments[0]->getName(), getName());
return std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeFloat32>());
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
const auto & column = arguments[0].column;
const ColumnString * col = checkAndGetColumn<ColumnString>(column.get());
if (!col)
throw Exception(
"Illegal columns " + arguments[0].column->getName() + " of arguments of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
const auto & input_data = col->getChars();
const auto & input_offsets = col->getOffsets();
/// Create and fill the result map.
const auto & result_type_map = static_cast<const DataTypeMap &>(*result_type);
const DataTypePtr & key_type = result_type_map.getKeyType();
const DataTypePtr & value_type = result_type_map.getValueType();
MutableColumnPtr keys_data = key_type->createColumn();
MutableColumnPtr values_data = value_type->createColumn();
MutableColumnPtr offsets = DataTypeNumber<IColumn::Offset>().createColumn();
size_t total_elements = input_rows_count * top_N;
keys_data->reserve(total_elements);
values_data->reserve(total_elements);
offsets->reserve(input_rows_count);
bool is_reliable;
CLD2::Language result_lang_top3[top_N];
int32_t pc[top_N];
int bytes[top_N];
IColumn::Offset current_offset = 0;
for (size_t i = 0; i < input_rows_count; ++i)
{
const UInt8 * str = input_data.data() + input_offsets[i - 1];
const size_t str_len = input_offsets[i] - input_offsets[i - 1] - 1;
if (UTF8::isValidUTF8(str, str_len))
{
CLD2::DetectLanguageSummary(reinterpret_cast<const char *>(str), str_len, true, result_lang_top3, pc, bytes, &is_reliable);
for (size_t j = 0; j < top_N; ++j)
{
if (pc[j] == 0)
break;
auto res_str = FunctionDetectLanguageImpl::codeISO(LanguageCode(result_lang_top3[j]));
Float32 res_float = static_cast<Float32>(pc[j]) / 100;
keys_data->insertData(res_str.data(), res_str.size());
values_data->insertData(reinterpret_cast<const char *>(&res_float), sizeof(res_float));
++current_offset;
}
}
else
{
std::string_view res_str = "un";
Float32 res_float = 0;
keys_data->insertData(res_str.data(), res_str.size());
values_data->insertData(reinterpret_cast<const char *>(&res_float), sizeof(res_float));
++current_offset;
}
offsets->insert(current_offset);
}
auto nested_column = ColumnArray::create(
ColumnTuple::create(Columns{std::move(keys_data), std::move(values_data)}),
std::move(offsets));
return ColumnMap::create(nested_column);
}
};
struct NameDetectLanguage
{
static constexpr auto name = "detectLanguage";
};
using FunctionDetectLanguage = FunctionTextClassificationString<FunctionDetectLanguageImpl, NameDetectLanguage>;
void registerFunctionsDetectLanguage(FunctionFactory & factory)
{
factory.registerFunction<FunctionDetectLanguage>();
factory.registerFunction<FunctionDetectLanguageMixed>();
}
}
#endif

View File

@ -0,0 +1,120 @@
#include <Common/FrequencyHolder.h>
#include <Common/StringUtils/StringUtils.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsTextClassification.h>
#include <unordered_map>
#include <string_view>
namespace DB
{
/**
* Determine the programming language from the source code.
* We calculate all the unigrams and bigrams of commands in the source code.
* Then using a marked-up dictionary with weights of unigrams and bigrams of commands for various programming languages
* Find the biggest weight of the programming language and return it
*/
struct FunctionDetectProgrammingLanguageImpl
{
/// Calculate total weight
static ALWAYS_INLINE inline Float64 stateMachine(
const FrequencyHolder::Map & standard,
const std::unordered_map<String, Float64> & model)
{
Float64 res = 0;
for (const auto & el : model)
{
/// Try to find each n-gram in dictionary
const auto * it = standard.find(el.first);
if (it != standard.end())
res += el.second * it->getMapped();
}
return res;
}
static void vector(
const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets)
{
const auto & programming_freq = FrequencyHolder::getInstance().getProgrammingFrequency();
/// Constant 5 is arbitrary
res_data.reserve(offsets.size() * 5);
res_offsets.resize(offsets.size());
size_t res_offset = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
const UInt8 * str = data.data() + offsets[i - 1];
const size_t str_len = offsets[i] - offsets[i - 1] - 1;
std::unordered_map<String, Float64> data_freq;
StringRef prev_command;
StringRef command;
/// Select all commands from the string
for (size_t ind = 0; ind < str_len; ++ind)
{
/// Assume that all commands are split by spaces
if (isWhitespaceASCII(str[ind]))
continue;
size_t prev_ind = ind;
while (ind < str_len && !isWhitespaceASCII(str[ind]))
++ind;
command = {str + prev_ind, ind - prev_ind};
/// We add both unigrams and bigrams to later search for them in the dictionary
if (prev_command.data)
data_freq[prev_command.toString() + command.toString()] += 1;
data_freq[command.toString()] += 1;
prev_command = command;
}
std::string_view res;
Float64 max_result = 0;
/// Iterate over all programming languages and find the language with the highest weight
for (const auto & item : programming_freq)
{
Float64 result = stateMachine(item.map, data_freq);
if (result > max_result)
{
max_result = result;
res = item.name;
}
}
/// If all weights are zero, then we assume that the language is undefined
if (res.empty())
res = "Undefined";
res_data.resize(res_offset + res.size() + 1);
memcpy(&res_data[res_offset], res.data(), res.size());
res_data[res_offset + res.size()] = 0;
res_offset += res.size() + 1;
res_offsets[i] = res_offset;
}
}
};
struct NameDetectProgrammingLanguage
{
static constexpr auto name = "detectProgrammingLanguage";
};
using FunctionDetectProgrammingLanguage = FunctionTextClassificationString<FunctionDetectProgrammingLanguageImpl, NameDetectProgrammingLanguage>;
void registerFunctionDetectProgrammingLanguage(FunctionFactory & factory)
{
factory.registerFunction<FunctionDetectProgrammingLanguage>();
}
}

View File

@ -0,0 +1,122 @@
#pragma once
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context_fwd.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
namespace DB
{
/// Functions for text classification with different result types
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_COLUMN;
extern const int SUPPORT_IS_DISABLED;
}
template <typename Impl, typename Name>
class FunctionTextClassificationString : public IFunction
{
public:
static constexpr auto name = Name::name;
static FunctionPtr create(ContextPtr context)
{
if (!context->getSettingsRef().allow_experimental_nlp_functions)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Natural language processing function '{}' is experimental. Set `allow_experimental_nlp_functions` setting to enable it", name);
return std::make_shared<FunctionTextClassificationString>();
}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument of function {}. Must be String.",
arguments[0]->getName(), getName());
return arguments[0];
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & /*result_type*/, size_t /*input_rows_count*/) const override
{
const ColumnPtr & column = arguments[0].column;
const ColumnString * col = checkAndGetColumn<ColumnString>(column.get());
if (!col)
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
auto col_res = ColumnString::create();
Impl::vector(col->getChars(), col->getOffsets(), col_res->getChars(), col_res->getOffsets());
return col_res;
}
};
template <typename Impl, typename Name>
class FunctionTextClassificationFloat : public IFunction
{
public:
static constexpr auto name = Name::name;
static FunctionPtr create(ContextPtr context)
{
if (!context->getSettingsRef().allow_experimental_nlp_functions)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Natural language processing function '{}' is experimental. Set `allow_experimental_nlp_functions` setting to enable it", name);
return std::make_shared<FunctionTextClassificationFloat>();
}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument of function {}. Must be String.",
arguments[0]->getName(), getName());
return std::make_shared<DataTypeFloat32>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & /*result_type*/, size_t /*input_rows_count*/) const override
{
const ColumnPtr & column = arguments[0].column;
const ColumnString * col = checkAndGetColumn<ColumnString>(column.get());
if (!col)
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
auto col_res = ColumnVector<Float32>::create();
ColumnVector<Float32>::Container & vec_res = col_res->getData();
vec_res.resize(col->size());
Impl::vector(col->getChars(), col->getOffsets(), vec_res);
return col_res;
}
};
}

View File

@ -0,0 +1,89 @@
#include <Common/FrequencyHolder.h>
#include <Common/StringUtils/StringUtils.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsTextClassification.h>
#include <unordered_map>
namespace DB
{
/**
* Determines the sentiment of text data.
* Uses a marked-up sentiment dictionary, each word has a tonality ranging from -12 to 6.
* For each text, calculate the average sentiment value of its words and return it in range [-1,1]
*/
struct FunctionDetectTonalityImpl
{
static ALWAYS_INLINE inline Float32 detectTonality(
const UInt8 * str,
const size_t str_len,
const FrequencyHolder::Map & emotional_dict)
{
Float64 weight = 0;
UInt64 count_words = 0;
String word;
/// Select all Russian words from the string
for (size_t ind = 0; ind < str_len; ++ind)
{
/// Split words by whitespaces and punctuation signs
if (isWhitespaceASCII(str[ind]) || isPunctuationASCII(str[ind]))
continue;
while (ind < str_len && !(isWhitespaceASCII(str[ind]) || isPunctuationASCII(str[ind])))
{
word.push_back(str[ind]);
++ind;
}
/// Try to find a russian word in the tonality dictionary
const auto * it = emotional_dict.find(word);
if (it != emotional_dict.end())
{
count_words += 1;
weight += it->getMapped();
}
word.clear();
}
if (!count_words)
return 0;
/// Calculate average value of tonality.
/// Convert values -12..6 to -1..1
if (weight > 0)
return weight / count_words / 6;
else
return weight / count_words / 12;
}
static void vector(
const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
PaddedPODArray<Float32> & res)
{
const auto & emotional_dict = FrequencyHolder::getInstance().getEmotionalDict();
size_t size = offsets.size();
size_t prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
res[i] = detectTonality(data.data() + prev_offset, offsets[i] - 1 - prev_offset, emotional_dict);
prev_offset = offsets[i];
}
}
};
struct NameDetectTonality
{
static constexpr auto name = "detectTonality";
};
using FunctionDetectTonality = FunctionTextClassificationFloat<FunctionDetectTonalityImpl, NameDetectTonality>;
void registerFunctionDetectTonality(FunctionFactory & factory)
{
factory.registerFunction<FunctionDetectTonality>();
}
}

View File

@ -8,4 +8,5 @@
#cmakedefine01 USE_H3
#cmakedefine01 USE_S2_GEOMETRY
#cmakedefine01 USE_FASTOPS
#cmakedefine01 USE_NLP
#cmakedefine01 USE_HYPERSCAN

View File

@ -39,6 +39,9 @@ void registerFunctionEncodeXMLComponent(FunctionFactory &);
void registerFunctionDecodeXMLComponent(FunctionFactory &);
void registerFunctionExtractTextFromHTML(FunctionFactory &);
void registerFunctionToStringCutToZero(FunctionFactory &);
void registerFunctionDetectCharset(FunctionFactory &);
void registerFunctionDetectTonality(FunctionFactory &);
void registerFunctionDetectProgrammingLanguage(FunctionFactory &);
#if USE_BASE64
void registerFunctionBase64Encode(FunctionFactory &);
@ -50,6 +53,7 @@ void registerFunctionTryBase64Decode(FunctionFactory &);
void registerFunctionStem(FunctionFactory &);
void registerFunctionSynonyms(FunctionFactory &);
void registerFunctionLemmatize(FunctionFactory &);
void registerFunctionsDetectLanguage(FunctionFactory &);
#endif
#if USE_ICU
@ -91,6 +95,9 @@ void registerFunctionsString(FunctionFactory & factory)
registerFunctionDecodeXMLComponent(factory);
registerFunctionExtractTextFromHTML(factory);
registerFunctionToStringCutToZero(factory);
registerFunctionDetectCharset(factory);
registerFunctionDetectTonality(factory);
registerFunctionDetectProgrammingLanguage(factory);
#if USE_BASE64
registerFunctionBase64Encode(factory);
@ -102,6 +109,7 @@ void registerFunctionsString(FunctionFactory & factory)
registerFunctionStem(factory);
registerFunctionSynonyms(factory);
registerFunctionLemmatize(factory);
registerFunctionsDetectLanguage(factory);
#endif
#if USE_ICU

View File

@ -35,11 +35,9 @@ namespace ClusterProxy
SelectStreamFactory::SelectStreamFactory(
const Block & header_,
QueryProcessingStage::Enum processed_stage_,
bool has_virtual_shard_num_column_)
: header(header_),
processed_stage{processed_stage_},
has_virtual_shard_num_column(has_virtual_shard_num_column_)
QueryProcessingStage::Enum processed_stage_)
: header(header_)
, processed_stage{processed_stage_}
{
}
@ -102,19 +100,15 @@ void SelectStreamFactory::createForShard(
Shards & remote_shards,
UInt32 shard_count)
{
auto modified_query_ast = query_ast->clone();
if (has_virtual_shard_num_column)
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_shard_num", shard_info.shard_num, "toUInt32");
auto emplace_local_stream = [&]()
{
local_plans.emplace_back(createLocalPlan(modified_query_ast, header, context, processed_stage, shard_info.shard_num, shard_count));
local_plans.emplace_back(createLocalPlan(query_ast, header, context, processed_stage, shard_info.shard_num, shard_count));
};
auto emplace_remote_stream = [&](bool lazy = false, UInt32 local_delay = 0)
{
remote_shards.emplace_back(Shard{
.query = modified_query_ast,
.query = query_ast,
.header = header,
.shard_num = shard_info.shard_num,
.num_replicas = shard_info.getAllNodeCount(),

View File

@ -16,8 +16,7 @@ class SelectStreamFactory final : public IStreamFactory
public:
SelectStreamFactory(
const Block & header_,
QueryProcessingStage::Enum processed_stage_,
bool has_virtual_shard_num_column_);
QueryProcessingStage::Enum processed_stage_);
void createForShard(
const Cluster::ShardInfo & shard_info,
@ -32,8 +31,6 @@ public:
private:
const Block header;
QueryProcessingStage::Enum processed_stage;
bool has_virtual_shard_num_column = false;
};
}

View File

@ -32,15 +32,12 @@
#include <base/scope_guard.h>
#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int TIMEOUT_EXCEEDED;
extern const int LOGICAL_ERROR;
}
@ -114,13 +111,12 @@ std::shared_ptr<TSystemLog> createSystemLog(
return std::make_shared<TSystemLog>(context, database, table, engine, flush_interval_milliseconds);
}
}
///
/// ISystemLog
///
ASTPtr ISystemLog::getCreateTableQueryClean(const StorageID & table_id, ContextPtr context)
/// returns CREATE TABLE query, but with removed:
/// - UUID
/// - SETTINGS (for MergeTree)
/// That way it can be used to compare with the SystemLog::getCreateTableQuery()
ASTPtr getCreateTableQueryClean(const StorageID & table_id, ContextPtr context)
{
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
ASTPtr old_ast = database->getCreateTableQuery(table_id.table_name, context);
@ -135,37 +131,8 @@ ASTPtr ISystemLog::getCreateTableQueryClean(const StorageID & table_id, ContextP
return old_ast;
}
void ISystemLog::stopFlushThread()
{
{
std::lock_guard lock(mutex);
if (!saving_thread.joinable())
{
return;
}
if (is_shutdown)
{
return;
}
is_shutdown = true;
/// Tell thread to shutdown.
flush_event.notify_all();
}
saving_thread.join();
}
void ISystemLog::startup()
{
std::lock_guard lock(mutex);
saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); });
}
///
/// SystemLogs
///
@ -270,77 +237,6 @@ SystemLog<LogElement>::SystemLog(
log = &Poco::Logger::get("SystemLog (" + database_name_ + "." + table_name_ + ")");
}
static thread_local bool recursive_add_call = false;
template <typename LogElement>
void SystemLog<LogElement>::add(const LogElement & element)
{
/// It is possible that the method will be called recursively.
/// Better to drop these events to avoid complications.
if (recursive_add_call)
return;
recursive_add_call = true;
SCOPE_EXIT({ recursive_add_call = false; });
/// Memory can be allocated while resizing on queue.push_back.
/// The size of allocation can be in order of a few megabytes.
/// But this should not be accounted for query memory usage.
/// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky.
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
/// Should not log messages under mutex.
bool queue_is_half_full = false;
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
if (queue.size() == DBMS_SYSTEM_LOG_QUEUE_SIZE / 2)
{
queue_is_half_full = true;
// The queue more than half full, time to flush.
// We only check for strict equality, because messages are added one
// by one, under exclusive lock, so we will see each message count.
// It is enough to only wake the flushing thread once, after the message
// count increases past half available size.
const uint64_t queue_end = queue_front_index + queue.size();
if (requested_flush_up_to < queue_end)
requested_flush_up_to = queue_end;
flush_event.notify_all();
}
if (queue.size() >= DBMS_SYSTEM_LOG_QUEUE_SIZE)
{
// Ignore all further entries until the queue is flushed.
// Log a message about that. Don't spam it -- this might be especially
// problematic in case of trace log. Remember what the front index of the
// queue was when we last logged the message. If it changed, it means the
// queue was flushed, and we can log again.
if (queue_front_index != logged_queue_full_at_index)
{
logged_queue_full_at_index = queue_front_index;
// TextLog sets its logger level to 0, so this log is a noop and
// there is no recursive logging.
lock.unlock();
LOG_ERROR(log, "Queue is full for system log '{}' at {}", demangle(typeid(*this).name()), queue_front_index);
}
return;
}
queue.push_back(element);
}
if (queue_is_half_full)
LOG_INFO(log, "Queue is half full for system log '{}'.", demangle(typeid(*this).name()));
}
template <typename LogElement>
void SystemLog<LogElement>::shutdown()
{
@ -351,48 +247,6 @@ void SystemLog<LogElement>::shutdown()
table->flushAndShutdown();
}
template <typename LogElement>
void SystemLog<LogElement>::flush(bool force)
{
uint64_t this_thread_requested_offset;
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
this_thread_requested_offset = queue_front_index + queue.size();
// Publish our flush request, taking care not to overwrite the requests
// made by other threads.
is_force_prepare_tables |= force;
requested_flush_up_to = std::max(requested_flush_up_to,
this_thread_requested_offset);
flush_event.notify_all();
}
LOG_DEBUG(log, "Requested flush up to offset {}",
this_thread_requested_offset);
// Use an arbitrary timeout to avoid endless waiting. 60s proved to be
// too fast for our parallel functional tests, probably because they
// heavily load the disk.
const int timeout_seconds = 180;
std::unique_lock lock(mutex);
bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds),
[&] { return flushed_up_to >= this_thread_requested_offset
&& !is_force_prepare_tables; });
if (!result)
{
throw Exception("Timeout exceeded (" + toString(timeout_seconds) + " s) while flushing system log '" + demangle(typeid(*this).name()) + "'.",
ErrorCodes::TIMEOUT_EXCEEDED);
}
}
template <typename LogElement>
void SystemLog<LogElement>::savingThreadFunction()
{
@ -625,17 +479,7 @@ ASTPtr SystemLog<LogElement>::getCreateTableQuery()
return create;
}
template class SystemLog<AsynchronousMetricLogElement>;
template class SystemLog<CrashLogElement>;
template class SystemLog<MetricLogElement>;
template class SystemLog<OpenTelemetrySpanLogElement>;
template class SystemLog<PartLogElement>;
template class SystemLog<QueryLogElement>;
template class SystemLog<QueryThreadLogElement>;
template class SystemLog<QueryViewsLogElement>;
template class SystemLog<SessionLogElement>;
template class SystemLog<TraceLogElement>;
template class SystemLog<ZooKeeperLogElement>;
template class SystemLog<TextLogElement>;
#define INSTANTIATE_SYSTEM_LOG(ELEMENT) template class SystemLog<ELEMENT>;
SYSTEM_LOG_ELEMENTS(INSTANTIATE_SYSTEM_LOG)
}

View File

@ -1,34 +1,12 @@
#pragma once
#include <thread>
#include <atomic>
#include <memory>
#include <vector>
#include <condition_variable>
#include <boost/noncopyable.hpp>
#include <Common/SystemLogBase.h>
#include <base/types.h>
#include <Core/Defines.h>
#include <Storages/IStorage_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/StorageID.h>
#include <Parsers/IAST_fwd.h>
#include <Common/ThreadPool.h>
namespace Poco
{
class Logger;
namespace Util
{
class AbstractConfiguration;
}
}
namespace DB
{
/** Allow to store structured log in system table.
*
* Logging is asynchronous. Data is put into queue from where it will be read by separate thread.
@ -66,44 +44,6 @@ class QueryViewsLog;
class ZooKeeperLog;
class SessionLog;
class ISystemLog
{
public:
virtual String getName() = 0;
//// force -- force table creation (used for SYSTEM FLUSH LOGS)
virtual void flush(bool force = false) = 0;
virtual void prepareTable() = 0;
/// Start the background thread.
virtual void startup();
/// Stop the background flush thread before destructor. No more data will be written.
virtual void shutdown() = 0;
virtual ~ISystemLog() = default;
virtual void savingThreadFunction() = 0;
/// returns CREATE TABLE query, but with removed:
/// - UUID
/// - SETTINGS (for MergeTree)
/// That way it can be used to compare with the SystemLog::getCreateTableQuery()
static ASTPtr getCreateTableQueryClean(const StorageID & table_id, ContextPtr context);
protected:
ThreadFromGlobalPool saving_thread;
/// Data shared between callers of add()/flush()/shutdown(), and the saving thread
std::mutex mutex;
bool is_shutdown = false;
std::condition_variable flush_event;
void stopFlushThread();
};
/// System logs should be destroyed in destructor of the last Context and before tables,
/// because SystemLog destruction makes insert query while flushing data into underlying tables
struct SystemLogs
@ -136,10 +76,11 @@ struct SystemLogs
template <typename LogElement>
class SystemLog : public ISystemLog, private boost::noncopyable, WithContext
class SystemLog : public SystemLogBase<LogElement>, private boost::noncopyable, WithContext
{
public:
using Self = SystemLog;
using Base = SystemLogBase<LogElement>;
/** Parameter: table name where to write log.
* If table is not exists, then it get created with specified engine.
@ -156,27 +97,23 @@ public:
const String & storage_def_,
size_t flush_interval_milliseconds_);
/** Append a record into log.
* Writing to table will be done asynchronously and in case of failure, record could be lost.
*/
void add(const LogElement & element);
void shutdown() override;
/// Flush data in the buffer to disk
void flush(bool force) override;
String getName() override
{
return LogElement::name();
}
ASTPtr getCreateTableQuery();
protected:
Poco::Logger * log;
using ISystemLog::mutex;
using ISystemLog::is_shutdown;
using ISystemLog::flush_event;
using ISystemLog::stopFlushThread;
using Base::log;
using Base::queue;
using Base::queue_front_index;
using Base::is_force_prepare_tables;
using Base::requested_flush_up_to;
using Base::flushed_up_to;
using Base::logged_queue_full_at_index;
private:
/* Saving thread data */
const StorageID table_id;
const String storage_def;
@ -185,32 +122,17 @@ private:
bool is_prepared = false;
const size_t flush_interval_milliseconds;
// Queue is bounded. But its size is quite large to not block in all normal cases.
std::vector<LogElement> queue;
// An always-incrementing index of the first message currently in the queue.
// We use it to give a global sequential index to every message, so that we
// can wait until a particular message is flushed. This is used to implement
// synchronous log flushing for SYSTEM FLUSH LOGS.
uint64_t queue_front_index = 0;
// A flag that says we must create the tables even if the queue is empty.
bool is_force_prepare_tables = false;
// Requested to flush logs up to this index, exclusive
uint64_t requested_flush_up_to = 0;
// Flushed log up to this index, exclusive
uint64_t flushed_up_to = 0;
// Logged overflow message at this queue front index
uint64_t logged_queue_full_at_index = -1;
void savingThreadFunction() override;
/** Creates new table if it does not exist.
* Renames old table if its structure is not suitable.
* This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created.
*/
void prepareTable() override;
void savingThreadFunction() override;
/// flushImpl can be executed only in saving_thread.
void flushImpl(const std::vector<LogElement> & to_flush, uint64_t to_flush_end);
ASTPtr getCreateTableQuery();
};
}

View File

@ -792,6 +792,39 @@ void markTupleLiteralsAsLegacy(ASTPtr & query)
MarkTupleLiteralsAsLegacyVisitor(data).visit(query);
}
/// Rewrite _shard_num -> shardNum() AS _shard_num
struct RewriteShardNum
{
struct Data
{
};
static bool needChildVisit(const ASTPtr & parent, const ASTPtr & /*child*/)
{
/// ON section should not be rewritten.
return typeid_cast<ASTTableJoin *>(parent.get()) == nullptr;
}
static void visit(ASTPtr & ast, Data &)
{
if (auto * identifier = typeid_cast<ASTIdentifier *>(ast.get()))
visit(*identifier, ast);
}
static void visit(ASTIdentifier & identifier, ASTPtr & ast)
{
if (identifier.shortName() != "_shard_num")
return;
String alias = identifier.tryGetAlias();
if (alias.empty())
alias = "_shard_num";
ast = makeASTFunction("shardNum");
ast->setAlias(alias);
}
};
using RewriteShardNumVisitor = InDepthNodeVisitor<RewriteShardNum, true>;
}
TreeRewriterResult::TreeRewriterResult(
@ -962,6 +995,7 @@ void TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select
++it;
}
has_virtual_shard_num = false;
/// If there are virtual columns among the unknown columns. Remove them from the list of unknown and add
/// in columns list, so that when further processing they are also considered.
if (storage)
@ -978,6 +1012,18 @@ void TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select
else
++it;
}
if (is_remote_storage)
{
for (const auto & name_type : storage_virtuals)
{
if (name_type.name == "_shard_num" && storage->isVirtualColumn("_shard_num", metadata_snapshot))
{
has_virtual_shard_num = true;
break;
}
}
}
}
if (!unknown_required_source_columns.empty())
@ -1165,6 +1211,13 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
}
}
/// Rewrite _shard_num to shardNum()
if (result.has_virtual_shard_num)
{
RewriteShardNumVisitor::Data data_rewrite_shard_num;
RewriteShardNumVisitor(data_rewrite_shard_num).visit(query);
}
result.ast_join = select_query->join();
if (result.optimize_trivial_count)

View File

@ -70,6 +70,9 @@ struct TreeRewriterResult
/// Cache isRemote() call for storage, because it may be too heavy.
bool is_remote_storage = false;
/// Rewrite _shard_num to shardNum()
bool has_virtual_shard_num = false;
/// Results of scalar sub queries
Scalars scalars;

View File

@ -61,6 +61,7 @@
#include <Processors/Sources/WaitForAsyncInsertSource.h>
#include <base/EnumReflection.h>
#include <base/demangle.h>
#include <random>
@ -659,7 +660,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (context->query_trace_context.trace_id != UUID())
{
auto * raw_interpreter_ptr = interpreter.get();
std::string class_name(abi::__cxa_demangle(typeid(*raw_interpreter_ptr).name(), nullptr, nullptr, nullptr));
std::string class_name(demangle(typeid(*raw_interpreter_ptr).name()));
span = std::make_unique<OpenTelemetrySpanHolder>(class_name + "::execute()");
}
res = interpreter->execute();

View File

@ -32,6 +32,15 @@ void ASTFunction::appendColumnNameImpl(WriteBuffer & ostr) const
if (name == "view")
throw Exception("Table function view cannot be used as an expression", ErrorCodes::UNEXPECTED_EXPRESSION);
/// If function can be converted to literal it will be parsed as literal after formatting.
/// In distributed query it may lead to mismathed column names.
/// To avoid it we check whether we can convert function to literal.
if (auto literal = toLiteral())
{
literal->appendColumnName(ostr);
return;
}
writeString(name, ostr);
if (parameters)
@ -111,31 +120,42 @@ void ASTFunction::updateTreeHashImpl(SipHash & hash_state) const
IAST::updateTreeHashImpl(hash_state);
}
template <typename Container>
static ASTPtr createLiteral(const ASTs & arguments)
{
Container container;
for (const auto & arg : arguments)
{
if (const auto * literal = arg->as<ASTLiteral>())
{
container.push_back(literal->value);
}
else if (auto * func = arg->as<ASTFunction>())
{
if (auto func_literal = func->toLiteral())
container.push_back(func_literal->as<ASTLiteral>()->value);
else
return {};
}
else
/// Some of the Array or Tuple arguments is not literal
return {};
}
return std::make_shared<ASTLiteral>(container);
}
ASTPtr ASTFunction::toLiteral() const
{
if (!arguments) return {};
if (!arguments)
return {};
if (name == "array")
{
Array array;
return createLiteral<Array>(arguments->children);
for (const auto & arg : arguments->children)
{
if (auto * literal = arg->as<ASTLiteral>())
array.push_back(literal->value);
else if (auto * func = arg->as<ASTFunction>())
{
if (auto func_literal = func->toLiteral())
array.push_back(func_literal->as<ASTLiteral>()->value);
}
else
/// Some of the Array arguments is not literal
return {};
}
return std::make_shared<ASTLiteral>(array);
}
if (name == "tuple")
return createLiteral<Tuple>(arguments->children);
return {};
}

View File

@ -217,7 +217,6 @@ public:
/// Extract data from the backup and put it to the storage.
virtual RestoreDataTasks restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context);
protected:
/// Returns whether the column is virtual - by default all columns are real.
/// Initially reserved virtual column name may be shadowed by real column.
bool isVirtualColumn(const String & column_name, const StorageMetadataPtr & metadata_snapshot) const;

View File

@ -308,7 +308,7 @@ NamesAndTypesList StorageDistributed::getVirtuals() const
NameAndTypePair("_part_uuid", std::make_shared<DataTypeUUID>()),
NameAndTypePair("_partition_id", std::make_shared<DataTypeString>()),
NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()),
NameAndTypePair("_shard_num", std::make_shared<DataTypeUInt32>()),
NameAndTypePair("_shard_num", std::make_shared<DataTypeUInt32>()), /// deprecated
};
}
@ -605,8 +605,8 @@ Pipe StorageDistributed::read(
void StorageDistributed::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const Names &,
const StorageMetadataPtr &,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
@ -635,10 +635,6 @@ void StorageDistributed::read(
return;
}
bool has_virtual_shard_num_column = std::find(column_names.begin(), column_names.end(), "_shard_num") != column_names.end();
if (has_virtual_shard_num_column && !isVirtualColumn("_shard_num", metadata_snapshot))
has_virtual_shard_num_column = false;
StorageID main_table = StorageID::createEmpty();
if (!remote_table_function_ptr)
main_table = StorageID{remote_database, remote_table};
@ -646,8 +642,7 @@ void StorageDistributed::read(
ClusterProxy::SelectStreamFactory select_stream_factory =
ClusterProxy::SelectStreamFactory(
header,
processed_stage,
has_virtual_shard_num_column);
processed_stage);
ClusterProxy::executeQuery(
query_plan, header, processed_stage,

View File

@ -40,7 +40,6 @@ StorageSQLite::StorageSQLite(
, WithContext(context_->getGlobalContext())
, remote_table_name(remote_table_name_)
, database_path(database_path_)
, global_context(context_)
, sqlite_db(sqlite_db_)
, log(&Poco::Logger::get("StorageSQLite (" + table_id_.table_name + ")"))
{

View File

@ -48,7 +48,6 @@ public:
private:
String remote_table_name;
String database_path;
ContextPtr global_context;
SQLitePtr sqlite_db;
Poco::Logger * log;
};

View File

@ -0,0 +1,20 @@
<test>
<settings>
<allow_experimental_nlp_functions>1</allow_experimental_nlp_functions>
</settings>
<preconditions>
<table_exists>hits_100m_single</table_exists>
</preconditions>
<query>SELECT detectLanguage(SearchPhrase) FROM hits_100m_single FORMAT Null</query>
<query>SELECT detectLanguageMixed(SearchPhrase) FROM hits_100m_single FORMAT Null</query>
<query>SELECT detectTonality(SearchPhrase) FROM hits_100m_single FORMAT Null</query>
<!-- Input is not really correct for these functions,
but at least it gives us some idea about their performance -->
<query>SELECT detectProgrammingLanguage(SearchPhrase) FROM hits_100m_single FORMAT Null</query>
<query>SELECT detectLanguageUnknown(SearchPhrase) FROM hits_100m_single FORMAT Null</query>
<query>SELECT detectCharset(SearchPhrase) FROM hits_100m_single FORMAT Null</query>
</test>

View File

@ -90,3 +90,31 @@
21
22
23
6
7
7
5
6
7
8
9
10
11
12
13
14
15
16
17
5
6
7
8
9
10
11
12
13
14
15
16

View File

@ -93,3 +93,34 @@ SELECT position(concat(' иголка.ру', arrayStringConcat
SELECT position(concat(' иголка.ру', arrayStringConcat(arrayMap(x -> ' ', range(20000)))), 'иголка.ру') AS res;
SELECT position(concat(' иголка.ру', arrayStringConcat(arrayMap(x -> ' ', range(20000)))), 'иголка.ру') AS res;
SELECT position(concat(' иголка.ру', arrayStringConcat(arrayMap(x -> ' ', range(20000)))), 'иголка.ру') AS res;
SELECT positionCaseInsensitiveUTF8(materialize('test ß test'), 'ß') AS res;
SELECT positionCaseInsensitiveUTF8(materialize('test AaßAa test'), 'aßa') AS res;
SELECT positionCaseInsensitiveUTF8(materialize('test A1ß2a test'), '1ß2') AS res;
SELECT positionCaseInsensitiveUTF8(materialize('xẞyyaa1ẞ1yzẞXẞẞ1ẞẞ1bctest'), 'aa1ẞ1Yzßxßß1ßß1BC') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat('test a1ßAa test', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'a1ẞaa') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' test a1ßAa test', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'a1ẞaa') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' test a1ßAa test', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'a1ẞaa') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' test a1ßAa test', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'a1ẞaa') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' test a1ßAa test', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'a1ẞaa') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' test a1ßAa test', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'a1ẞaa') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' test a1ßAa test', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'a1ẞaa') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' test a1ßAa test', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'a1ẞaa') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' test a1ßAa test', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'a1ẞaa') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' test a1ßAa test', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'a1ẞaa') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' test a1ßAa test', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'a1ẞaa') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' test a1ßAa test', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'a1ẞaa') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat('xẞyyaa1ẞ1yzẞXẞẞ1ẞẞ1bctest', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'aa1ẞ1Yzßxßß1ßß1BC') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' xẞyyaa1ẞ1yzẞXẞẞ1ẞẞ1bctest', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'aa1ẞ1Yzßxßß1ßß1BC') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' xẞyyaa1ẞ1yzẞXẞẞ1ẞẞ1bctest', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'aa1ẞ1Yzßxßß1ßß1BC') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' xẞyyaa1ẞ1yzẞXẞẞ1ẞẞ1bctest', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'aa1ẞ1Yzßxßß1ßß1BC') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' xẞyyaa1ẞ1yzẞXẞẞ1ẞẞ1bctest', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'aa1ẞ1Yzßxßß1ßß1BC') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' xẞyyaa1ẞ1yzẞXẞẞ1ẞẞ1bctest', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'aa1ẞ1Yzßxßß1ßß1BC') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' xẞyyaa1ẞ1yzẞXẞẞ1ẞẞ1bctest', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'aa1ẞ1Yzßxßß1ßß1BC') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' xẞyyaa1ẞ1yzẞXẞẞ1ẞẞ1bctest', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'aa1ẞ1Yzßxßß1ßß1BC') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' xẞyyaa1ẞ1yzẞXẞẞ1ẞẞ1bctest', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'aa1ẞ1Yzßxßß1ßß1BC') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' xẞyyaa1ẞ1yzẞXẞẞ1ẞẞ1bctest', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'aa1ẞ1Yzßxßß1ßß1BC') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' xẞyyaa1ẞ1yzẞXẞẞ1ẞẞ1bctest', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'aa1ẞ1Yzßxßß1ßß1BC') AS res;
SELECT positionCaseInsensitiveUTF8(materialize(concat(' xẞyyaa1ẞ1yzẞXẞẞ1ẞẞ1bctest', arrayStringConcat(arrayMap(x -> ' ', range(20000))))), 'aa1ẞ1Yzßxßß1ßß1BC') AS res;

View File

@ -1,34 +1,94 @@
-- { echoOn }
-- remote(system.one)
SELECT 'remote(system.one)';
remote(system.one)
SELECT * FROM remote('127.0.0.1', system.one);
0
SELECT * FROM remote('127.0.0.{1,2}', system.one);
0
0
0
SELECT _shard_num, * FROM remote('127.0.0.1', system.one);
1 0
SELECT _shard_num, * FROM remote('127.0.0.{1,2}', system.one) order by _shard_num;
1 0
2 0
SELECT _shard_num, * FROM remote('127.0.0.{1,2}', system.one) WHERE _shard_num = 1;
1 0
-- dist_1 using test_shard_localhost
SELECT 'dist_1';
dist_1
1
1 10
10
SELECT _shard_num FROM dist_1 order by _shard_num;
1
1
SELECT _shard_num FROM dist_1 order by _shard_num;
1
1
SELECT _shard_num, key FROM dist_1 order by _shard_num;
1 10
1 20
SELECT key FROM dist_1;
10
20
SELECT _shard_num FROM dist_1 order by _shard_num;
1
1
SELECT _shard_num, key FROM dist_1 order by _shard_num, key;
1 10
1 20
SELECT key FROM dist_1;
10
20
-- dist_2 using test_cluster_two_shards_localhost
SELECT 'dist_2';
dist_2
SELECT _shard_num FROM dist_2 order by _shard_num;
1
2
SELECT _shard_num FROM dist_2 order by _shard_num;
1
2
SELECT _shard_num, key FROM dist_2 order by _shard_num, key;
1 100
2 100
SELECT key FROM dist_2;
100
100
-- multiple _shard_num
SELECT 'remote(Distributed)';
remote(Distributed)
SELECT _shard_num, key FROM remote('127.0.0.1', currentDatabase(), dist_2) order by _shard_num, key;
1 100
1 100
2 100
-- JOIN system.clusters
SELECT 'JOIN system.clusters';
JOIN system.clusters
SELECT a._shard_num, a.key, b.host_name, b.host_address IN ('::1', '127.0.0.1'), b.port
FROM (SELECT *, _shard_num FROM dist_1) a
JOIN system.clusters b
ON a._shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost';
1 10 localhost 1 9000
1 20 localhost 1 9000
SELECT _shard_num, key, b.host_name, b.host_address IN ('::1', '127.0.0.1'), b.port
FROM dist_1 a
JOIN system.clusters b
ON _shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost'; -- { serverError 403 }
SELECT 'Rewrite with alias';
Rewrite with alias
SELECT a._shard_num, key FROM dist_1 a;
1 10
1 20
-- the same with JOIN, just in case
SELECT a._shard_num, a.key, b.host_name, b.host_address IN ('::1', '127.0.0.1'), b.port
FROM dist_1 a
JOIN system.clusters b
ON a._shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost'; -- { serverError 47; }
SELECT 'dist_3';
dist_3
SELECT * FROM dist_3;
100 foo
SELECT _shard_num, * FROM dist_3 order by _shard_num;
foo 100 foo

View File

@ -3,6 +3,28 @@
-- make the order static
SET max_threads = 1;
DROP TABLE IF EXISTS mem1;
DROP TABLE IF EXISTS mem2;
DROP TABLE IF EXISTS mem3;
DROP TABLE IF EXISTS dist_1;
DROP TABLE IF EXISTS dist_2;
DROP TABLE IF EXISTS dist_3;
CREATE TABLE mem1 (key Int) Engine=Memory();
INSERT INTO mem1 VALUES (10);
CREATE TABLE dist_1 AS mem1 Engine=Distributed(test_shard_localhost, currentDatabase(), mem1);
INSERT INTO dist_1 VALUES (20);
CREATE TABLE mem2 (key Int) Engine=Memory();
INSERT INTO mem2 VALUES (100);
CREATE TABLE dist_2 AS mem2 Engine=Distributed(test_cluster_two_shards_localhost, currentDatabase(), mem2);
CREATE TABLE mem3 (key Int, _shard_num String) Engine=Memory();
INSERT INTO mem3 VALUES (100, 'foo');
CREATE TABLE dist_3 AS mem3 Engine=Distributed(test_shard_localhost, currentDatabase(), mem3);
-- { echoOn }
-- remote(system.one)
SELECT 'remote(system.one)';
SELECT * FROM remote('127.0.0.1', system.one);
@ -13,27 +35,20 @@ SELECT _shard_num, * FROM remote('127.0.0.{1,2}', system.one) WHERE _shard_num =
-- dist_1 using test_shard_localhost
SELECT 'dist_1';
CREATE TABLE mem1 (key Int) Engine=Memory();
CREATE TABLE dist_1 AS mem1 Engine=Distributed(test_shard_localhost, currentDatabase(), mem1);
SELECT _shard_num FROM dist_1 order by _shard_num;
INSERT INTO mem1 VALUES (10);
SELECT _shard_num FROM dist_1 order by _shard_num;
SELECT _shard_num, key FROM dist_1 order by _shard_num;
SELECT key FROM dist_1;
INSERT INTO dist_1 VALUES (20);
SELECT _shard_num FROM dist_1 order by _shard_num;
SELECT _shard_num, key FROM dist_1 order by _shard_num, key;
SELECT key FROM dist_1;
-- dist_2 using test_cluster_two_shards_localhost
SELECT 'dist_2';
CREATE TABLE mem2 (key Int) Engine=Memory();
CREATE TABLE dist_2 AS mem2 Engine=Distributed(test_cluster_two_shards_localhost, currentDatabase(), mem2);
SELECT _shard_num FROM dist_2 order by _shard_num;
INSERT INTO mem2 VALUES (100);
SELECT _shard_num FROM dist_2 order by _shard_num;
SELECT _shard_num, key FROM dist_2 order by _shard_num, key;
SELECT key FROM dist_2;
@ -57,8 +72,8 @@ JOIN system.clusters b
ON _shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost'; -- { serverError 403 }
-- rewrite does not work with aliases, hence Missing columns (47)
SELECT a._shard_num, key FROM dist_1 a; -- { serverError 47; }
SELECT 'Rewrite with alias';
SELECT a._shard_num, key FROM dist_1 a;
-- the same with JOIN, just in case
SELECT a._shard_num, a.key, b.host_name, b.host_address IN ('::1', '127.0.0.1'), b.port
FROM dist_1 a
@ -67,8 +82,5 @@ ON a._shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost'; -- { serverError 47; }
SELECT 'dist_3';
CREATE TABLE mem3 (key Int, _shard_num String) Engine=Memory();
CREATE TABLE dist_3 AS mem3 Engine=Distributed(test_shard_localhost, currentDatabase(), mem3);
INSERT INTO mem3 VALUES (100, 'foo');
SELECT * FROM dist_3;
SELECT _shard_num, * FROM dist_3 order by _shard_num;

View File

@ -3,3 +3,7 @@ UTC 1234567891011 2009-02-13 23:31:31.011 1970-01-15 06:56:07.891011 1970-01-01
Asia/Makassar 1234567891011 2009-02-14 07:31:31.011 1970-01-15 14:56:07.891011 1970-01-01 08:20:34.567891011 DateTime64(9, \'Asia/Makassar\')
non-const column
1234567891011 2009-02-13 23:31:31.011 1970-01-15 06:56:07.891011 1970-01-01 00:20:34.567891011
upper range bound
9904447342 2283-11-10 19:22:22.123 2283-11-10 19:22:22.123456 1925-01-01 00:00:00.586094827
lower range bound
-1420066799 1925-01-01 01:00:01.123 1925-01-01 01:00:01.123456 1925-01-01 01:00:01.123456789

View File

@ -42,4 +42,30 @@ SELECT
i64,
fromUnixTimestamp64Milli(i64, tz),
fromUnixTimestamp64Micro(i64, tz),
fromUnixTimestamp64Nano(i64, tz) as dt64;
fromUnixTimestamp64Nano(i64, tz) as dt64;
SELECT 'upper range bound';
WITH
9904447342 AS timestamp,
CAST(9904447342123 AS Int64) AS milli,
CAST(9904447342123456 AS Int64) AS micro,
CAST(9904447342123456789 AS Int64) AS nano,
'UTC' AS tz
SELECT
timestamp,
fromUnixTimestamp64Milli(milli, tz),
fromUnixTimestamp64Micro(micro, tz),
fromUnixTimestamp64Nano(nano, tz);
SELECT 'lower range bound';
WITH
-1420066799 AS timestamp,
CAST(-1420066799123 AS Int64) AS milli,
CAST(-1420066799123456 AS Int64) AS micro,
CAST(-1420066799123456789 AS Int64) AS nano,
'UTC' AS tz
SELECT
timestamp,
fromUnixTimestamp64Milli(milli, tz),
fromUnixTimestamp64Micro(micro, tz),
fromUnixTimestamp64Nano(nano, tz);

View File

@ -1,14 +1,31 @@
-- { echo }
with anySimpleState(number) as c select toTypeName(c), c from numbers(1);
SimpleAggregateFunction(any, UInt64) 0
with anyLastSimpleState(number) as c select toTypeName(c), c from numbers(1);
SimpleAggregateFunction(anyLast, UInt64) 0
with minSimpleState(number) as c select toTypeName(c), c from numbers(1);
SimpleAggregateFunction(min, UInt64) 0
with maxSimpleState(number) as c select toTypeName(c), c from numbers(1);
SimpleAggregateFunction(max, UInt64) 0
with sumSimpleState(number) as c select toTypeName(c), c from numbers(1);
SimpleAggregateFunction(sum, UInt64) 0
with sumWithOverflowSimpleState(number) as c select toTypeName(c), c from numbers(1);
SimpleAggregateFunction(sumWithOverflow, UInt64) 0
with groupBitAndSimpleState(number) as c select toTypeName(c), c from numbers(1);
SimpleAggregateFunction(groupBitAnd, UInt64) 0
with groupBitOrSimpleState(number) as c select toTypeName(c), c from numbers(1);
SimpleAggregateFunction(groupBitOr, UInt64) 0
with groupBitXorSimpleState(number) as c select toTypeName(c), c from numbers(1);
SimpleAggregateFunction(groupBitXor, UInt64) 0
with sumMapSimpleState(([number], [number])) as c select toTypeName(c), c from numbers(1);
SimpleAggregateFunction(sumMap, Tuple(Array(UInt64), Array(UInt64))) ([],[])
with minMapSimpleState(([number], [number])) as c select toTypeName(c), c from numbers(1);
SimpleAggregateFunction(minMap, Tuple(Array(UInt64), Array(UInt64))) ([0],[0])
with maxMapSimpleState(([number], [number])) as c select toTypeName(c), c from numbers(1);
SimpleAggregateFunction(maxMap, Tuple(Array(UInt64), Array(UInt64))) ([0],[0])
with groupArrayArraySimpleState([number]) as c select toTypeName(c), c from numbers(1);
SimpleAggregateFunction(groupArrayArray, Array(UInt64)) [0]
with groupUniqArrayArraySimpleState([number]) as c select toTypeName(c), c from numbers(1);
SimpleAggregateFunction(groupUniqArrayArray, Array(UInt64)) [0]
-- non-SimpleAggregateFunction
with countSimpleState(number) as c select toTypeName(c), c from numbers(1); -- { serverError 36 }

View File

@ -1,3 +1,4 @@
-- { echo }
with anySimpleState(number) as c select toTypeName(c), c from numbers(1);
with anyLastSimpleState(number) as c select toTypeName(c), c from numbers(1);
with minSimpleState(number) as c select toTypeName(c), c from numbers(1);

View File

@ -1,3 +1,2 @@
SELECT positionCaseInsensitiveUTF8('иголка.ру', 'иголка.р<>\0') AS res;
SELECT positionCaseInsensitiveUTF8('иголка.ру', randomString(rand() % 100)) FROM system.numbers; -- { serverError 2 }
SELECT sum(ignore(positionCaseInsensitiveUTF8('иголка.ру', randomString(rand() % 2)))) FROM numbers(1000000);

View File

@ -0,0 +1,15 @@
ru
en
fr
ja
zh
un
{'ja':0.62,'fr':0.36}
{'ko':0.98}
{}
ISO-8859-1
en
0.465
-0.28823888
0.050505556
C++

View File

@ -0,0 +1,23 @@
-- Tags: no-fasttest
-- Tag no-fasttest: depends on cld2 and nlp-data
SET allow_experimental_nlp_functions = 1;
SELECT detectLanguage('Они сошлись. Волна и камень, Стихи и проза, лед и пламень, Не столь различны меж собой.');
SELECT detectLanguage('Sweet are the uses of adversity which, like the toad, ugly and venomous, wears yet a precious jewel in his head.');
SELECT detectLanguage('A vaincre sans peril, on triomphe sans gloire.');
SELECT detectLanguage('二兎を追う者は一兎をも得ず');
SELECT detectLanguage('有情饮水饱,无情食饭饥。');
SELECT detectLanguage('*****///// _____ ,,,,,,,, .....');
SELECT detectLanguageMixed('二兎を追う者は一兎をも得ず二兎を追う者は一兎をも得ず A vaincre sans peril, on triomphe sans gloire.');
SELECT detectLanguageMixed('어디든 가치가 있는 곳으로 가려면 지름길은 없다');
SELECT detectLanguageMixed('*****///// _____ ,,,,,,,, .....');
SELECT detectCharset('Plain English');
SELECT detectLanguageUnknown('Plain English');
SELECT detectTonality('милая кошка');
SELECT detectTonality('ненависть к людям');
SELECT detectTonality('обычная прогулка по ближайшему парку');
SELECT detectProgrammingLanguage('#include <iostream>');

View File

@ -0,0 +1,17 @@
-- { echo }
SELECT shardNum() AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) GROUP BY _shard_num;
2 1
1 1
SELECT shardNum() AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) GROUP BY shard_num;
2 1
1 1
SELECT _shard_num AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) GROUP BY _shard_num;
2 1
1 1
SELECT _shard_num AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) GROUP BY shard_num;
2 1
1 1
SELECT a._shard_num AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) a GROUP BY shard_num;
2 1
1 1
SELECT _shard_num FROM remote('127.1', system.one) AS a INNER JOIN (SELECT _shard_num FROM system.one) AS b USING (dummy); -- { serverError UNKNOWN_IDENTIFIER }

View File

@ -0,0 +1,7 @@
-- { echo }
SELECT shardNum() AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) GROUP BY _shard_num;
SELECT shardNum() AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) GROUP BY shard_num;
SELECT _shard_num AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) GROUP BY _shard_num;
SELECT _shard_num AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) GROUP BY shard_num;
SELECT a._shard_num AS shard_num, sum(1) as rows FROM remote('127.{1,2}', system, one) a GROUP BY shard_num;
SELECT _shard_num FROM remote('127.1', system.one) AS a INNER JOIN (SELECT _shard_num FROM system.one) AS b USING (dummy); -- { serverError UNKNOWN_IDENTIFIER }

View File

@ -0,0 +1,11 @@
[0]
(0,1)
[[0,1],[2,3]]
[(0,1),(2,3)]
[(0,1),(2,3)]
([0,1],(2,3),[4],(5,'a'),6,'b')
[0,1]
(0,1)
[[0,1],[2,3]]
[[0,1],[0,0]]
[[[0]],[[1],[2,3]]]

View File

@ -0,0 +1,11 @@
SELECT any(array(0)) AS k FROM remote('127.0.0.{1,2}', numbers(10));
SELECT any(tuple(0, 1)) AS k FROM remote('127.0.0.{1,2}', numbers(10));
SELECT any(array(array(0, 1), array(2, 3))) AS k FROM remote('127.0.0.{1,2}', numbers(10));
SELECT any(array(tuple(0, 1), tuple(2, 3))) AS k FROM remote('127.0.0.{1,2}', numbers(10));
SELECT any(array((0, 1), (2, 3))) AS k FROM remote('127.0.0.{1,2}', numbers(10));
SELECT any(tuple(array(0, 1), tuple(2, 3), [4], (5, 'a'), 6, 'b')) AS k FROM remote('127.0.0.{1,2}', numbers(10));
SELECT any(array(number, 1)) AS k FROM remote('127.0.0.{1,2}', numbers(10));
SELECT any(tuple(number, 1)) AS k FROM remote('127.0.0.{1,2}', numbers(10));
SELECT any(array(array(0, 1), [2, 3])) AS k FROM remote('127.0.0.{1,2}', numbers(10));
SELECT any(array(array(0, 1), [number, number])) AS k FROM remote('127.0.0.{1,2}', numbers(10));
SELECT any([[[number]],[[number + 1], [number + 2, number + 3]]]) AS k FROM remote('127.0.0.{1,2}', numbers(10));

View File

@ -0,0 +1,7 @@
{"endUserIDs":{"_experience":{"aaid":{"id":"id_1","namespace":{"code":"code_1"},"primary":1},"mcid":{"id":"id_2","namespace":{"code":"code_2"},"primary":2}}}}
{"endUserIDs._experience":{"aaid":{"id":"id_1","namespace":{"code":"code_1"},"primary":1},"mcid":{"id":"id_2","namespace":{"code":"code_2"},"primary":2}}}
{"endUserIDs._experience.aaid":{"id":"id_1","namespace":{"code":"code_1"},"primary":1}}
{"endUserIDs._experience.aaid.id":"id_1"}
{"endUserIDs._experience.aaid.namespace":{"code":"code_1"}}
{"endUserIDs._experience.aaid.namespace.code":"code_1"}
{"endUserIDs._experience.aaid.primary":1}

View File

@ -0,0 +1,38 @@
DROP TABLE IF EXISTS t_nested_tuple;
CREATE TABLE t_nested_tuple
(
endUserIDs Tuple(
_experience Tuple(
aaid Tuple(
id Nullable(String),
namespace Tuple(
code LowCardinality(Nullable(String))
),
primary LowCardinality(Nullable(UInt8))
),
mcid Tuple(
id Nullable(String),
namespace Tuple(
code LowCardinality(Nullable(String))
),
primary LowCardinality(Nullable(UInt8))
)
)
)
)
ENGINE = MergeTree ORDER BY tuple();
SET output_format_json_named_tuples_as_objects = 1;
INSERT INTO t_nested_tuple FORMAT JSONEachRow {"endUserIDs":{"_experience":{"aaid":{"id":"id_1","namespace":{"code":"code_1"},"primary":1},"mcid":{"id":"id_2","namespace":{"code":"code_2"},"primary":2}}}};
SELECT * FROM t_nested_tuple FORMAT JSONEachRow;
SELECT endUserIDs._experience FROM t_nested_tuple FORMAT JSONEachRow;
SELECT endUserIDs._experience.aaid FROM t_nested_tuple FORMAT JSONEachRow;
SELECT endUserIDs._experience.aaid.id FROM t_nested_tuple FORMAT JSONEachRow;
SELECT endUserIDs._experience.aaid.namespace FROM t_nested_tuple FORMAT JSONEachRow;
SELECT endUserIDs._experience.aaid.namespace.code FROM t_nested_tuple FORMAT JSONEachRow;
SELECT endUserIDs._experience.aaid.primary FROM t_nested_tuple FORMAT JSONEachRow;
DROP TABLE t_nested_tuple;

View File

@ -0,0 +1,2 @@
something 1
something 1

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS test_02187;
CREATE TABLE test_02187 (
info String,
id Int32
)
ENGINE = ReplacingMergeTree(id)
ORDER BY id;
INSERT INTO TABLE test_02187 VALUES ('nothing', 1);
INSERT INTO TABLE test_02187 VALUES ('something', 1);
SELECT * FROM test_02187 FINAL;
SELECT * FROM test_02187 FINAL LIMIT 1;

View File

@ -1,3 +1,18 @@
## Download
Cloning whole repo will take a lot of time and disk space. The following commands will download only this directory.
* Requires Git 2.19
```
# mkdir chdiag
# cd chdiag
# git clone --depth 1 --filter=blob:none --no-checkout https://github.com/ClickHouse/ClickHouse
# cd ClickHouse
# git sparse-checkout set utils/clickhouse-diagnostics
# git checkout master -- utils/clickhouse-diagnostics
```
## Installation
```