Merge remote-tracking branch 'refs/remotes/upstream/master' into vfs

This commit is contained in:
Alexander Burmak 2019-12-09 16:35:02 +03:00
commit 5a93441445
52 changed files with 899 additions and 279 deletions

View File

@ -382,16 +382,12 @@ add_subdirectory (contrib EXCLUDE_FROM_ALL)
macro (add_executable target)
# invoke built-in add_executable
_add_executable (${ARGV})
# explicitly acquire and interpose malloc symbols by clickhouse_malloc
_add_executable (${ARGV} $<TARGET_OBJECTS:clickhouse_malloc>)
get_target_property (type ${target} TYPE)
if (${type} STREQUAL EXECUTABLE)
file (RELATIVE_PATH dir ${CMAKE_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR})
if (${dir} MATCHES "^dbms")
# Only interpose operator::new/delete for dbms executables (MemoryTracker stuff)
target_link_libraries (${target} PRIVATE clickhouse_new_delete ${MALLOC_LIBRARIES})
else ()
target_link_libraries (${target} PRIVATE ${MALLOC_LIBRARIES})
endif ()
# operator::new/delete for executables (MemoryTracker stuff)
target_link_libraries (${target} PRIVATE clickhouse_new_delete ${MALLOC_LIBRARIES})
endif()
endmacro()

View File

@ -20,9 +20,11 @@ else ()
message (WARNING "You are using an unsupported compiler. Compilation has only been tested with Clang 6+ and GCC 7+.")
endif ()
option (LINKER_NAME "Linker name or full path")
STRING(REGEX MATCHALL "[0-9]+" COMPILER_VERSION_LIST ${CMAKE_CXX_COMPILER_VERSION})
LIST(GET COMPILER_VERSION_LIST 0 COMPILER_VERSION_MAJOR)
find_program (LLD_PATH NAMES "ld.lld" "lld")
option (LINKER_NAME "Linker name or full path")
find_program (LLD_PATH NAMES "ld.lld" "lld" "lld-${COMPILER_VERSION_MAJOR}")
find_program (GOLD_PATH NAMES "ld.gold" "gold")
if (NOT LINKER_NAME)

View File

@ -100,7 +100,7 @@ set(dbms_sources)
add_headers_and_sources(clickhouse_common_io src/Common)
add_headers_and_sources(clickhouse_common_io src/Common/HashTable)
add_headers_and_sources(clickhouse_common_io src/IO)
list (REMOVE_ITEM clickhouse_common_io_sources src/Common/new_delete.cpp)
list (REMOVE_ITEM clickhouse_common_io_sources src/Common/malloc.cpp src/Common/new_delete.cpp)
if(USE_RDKAFKA)
add_headers_and_sources(dbms src/Storages/Kafka)
@ -140,6 +140,9 @@ endif ()
add_library(clickhouse_common_io ${clickhouse_common_io_headers} ${clickhouse_common_io_sources})
add_library (clickhouse_malloc OBJECT src/Common/malloc.cpp)
set_source_files_properties(src/Common/malloc.cpp PROPERTIES COMPILE_FLAGS "-fno-builtin")
add_library (clickhouse_new_delete STATIC src/Common/new_delete.cpp)
target_link_libraries (clickhouse_new_delete PRIVATE clickhouse_common_io)

View File

@ -30,11 +30,6 @@ if (Poco_Data_FOUND)
set(CLICKHOUSE_ODBC_BRIDGE_LINK ${CLICKHOUSE_ODBC_BRIDGE_LINK} PRIVATE ${Poco_Data_LIBRARY})
set(CLICKHOUSE_ODBC_BRIDGE_INCLUDE ${CLICKHOUSE_ODBC_BRIDGE_INCLUDE} SYSTEM PRIVATE ${Poco_Data_INCLUDE_DIR})
endif ()
if (USE_JEMALLOC)
# We need to link jemalloc directly to odbc-bridge-library, because in other case
# we will build it with default malloc.
set(CLICKHOUSE_ODBC_BRIDGE_LINK ${CLICKHOUSE_ODBC_BRIDGE_LINK} PRIVATE ${JEMALLOC_LIBRARIES})
endif()
clickhouse_program_add_library(odbc-bridge)

View File

@ -200,6 +200,8 @@ void TCPHandler::runImpl()
/// So, the stream has been marked as cancelled and we can't read from it anymore.
state.block_in.reset();
state.maybe_compressed_in.reset(); /// For more accurate accounting by MemoryTracker.
state.temporary_tables_read = true;
});
/// Send structure of columns to client for function input()
@ -339,6 +341,18 @@ void TCPHandler::runImpl()
LOG_WARNING(log, "Client has gone away.");
}
try
{
if (exception && !state.temporary_tables_read)
query_context->initializeExternalTablesIfSet();
}
catch (...)
{
network_error = true;
LOG_WARNING(log, "Can't read external tables after query failure.");
}
try
{
query_scope.reset();

View File

@ -63,6 +63,8 @@ struct QueryState
bool sent_all_data = false;
/// Request requires data from the client (INSERT, but not INSERT SELECT).
bool need_receive_data_for_insert = false;
/// Temporary tables read
bool temporary_tables_read = false;
/// Request requires data from client for function input()
bool need_receive_data_for_input = false;

View File

@ -4,6 +4,9 @@
#if USE_ICU
#include <unicode/ucol.h>
#include <unicode/unistr.h>
#include <unicode/locid.h>
#include <unicode/ucnv.h>
#else
#ifdef __clang__
#pragma clang diagnostic ignored "-Wunused-private-field"
@ -14,6 +17,7 @@
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
#include <Poco/String.h>
#include <algorithm>
namespace DB
@ -26,16 +30,81 @@ namespace DB
}
}
Collator::Collator(const std::string & locale_) : locale(Poco::toLower(locale_))
AvailableCollationLocales::AvailableCollationLocales()
{
#if USE_ICU
static const size_t MAX_LANG_LENGTH = 128;
size_t available_locales_count = ucol_countAvailable();
for (size_t i = 0; i < available_locales_count; ++i)
{
std::string locale_name = ucol_getAvailable(i);
UChar lang_buffer[MAX_LANG_LENGTH];
char normal_buf[MAX_LANG_LENGTH];
UErrorCode status = U_ZERO_ERROR;
/// All names will be in English language
size_t lang_length = uloc_getDisplayLanguage(
locale_name.c_str(), "en", lang_buffer, MAX_LANG_LENGTH, &status);
std::optional<std::string> lang;
if (!U_FAILURE(status))
{
/// Convert language name from UChar array to normal char array.
/// We use English language for name, so all UChar's length is equal to sizeof(char)
u_UCharsToChars(lang_buffer, normal_buf, lang_length);
lang.emplace(std::string(normal_buf, lang_length));
}
locales_map.emplace(Poco::toLower(locale_name), LocaleAndLanguage{locale_name, lang});
}
#endif
}
const AvailableCollationLocales & AvailableCollationLocales::instance()
{
static AvailableCollationLocales instance;
return instance;
}
AvailableCollationLocales::LocalesVector AvailableCollationLocales::getAvailableCollations() const
{
LocalesVector result;
for (const auto & name_and_locale : locales_map)
result.push_back(name_and_locale.second);
auto comparator = [] (const LocaleAndLanguage & f, const LocaleAndLanguage & s)
{
return f.locale_name < s.locale_name;
};
std::sort(result.begin(), result.end(), comparator);
return result;
}
bool AvailableCollationLocales::isCollationSupported(const std::string & locale_name) const
{
/// We support locale names in any case, so we have to convert all to lower case
return locales_map.count(Poco::toLower(locale_name));
}
Collator::Collator(const std::string & locale_)
: locale(Poco::toLower(locale_))
{
#if USE_ICU
/// We check it here, because ucol_open will fallback to default locale for
/// almost all random names.
if (!AvailableCollationLocales::instance().isCollationSupported(locale))
throw DB::Exception("Unsupported collation locale: " + locale, DB::ErrorCodes::UNSUPPORTED_COLLATION_LOCALE);
UErrorCode status = U_ZERO_ERROR;
collator = ucol_open(locale.c_str(), &status);
if (status != U_ZERO_ERROR)
if (U_FAILURE(status))
{
ucol_close(collator);
throw DB::Exception("Unsupported collation locale: " + locale, DB::ErrorCodes::UNSUPPORTED_COLLATION_LOCALE);
throw DB::Exception("Failed to open locale: " + locale + " with error: " + u_errorName(status), DB::ErrorCodes::UNSUPPORTED_COLLATION_LOCALE);
}
#else
throw DB::Exception("Collations support is disabled, because ClickHouse was built without ICU library", DB::ErrorCodes::SUPPORT_IS_DISABLED);
@ -60,8 +129,8 @@ int Collator::compare(const char * str1, size_t length1, const char * str2, size
UErrorCode status = U_ZERO_ERROR;
UCollationResult compare_result = ucol_strcollIter(collator, &iter1, &iter2, &status);
if (status != U_ZERO_ERROR)
throw DB::Exception("ICU collation comparison failed with error code: " + DB::toString<int>(status),
if (U_FAILURE(status))
throw DB::Exception("ICU collation comparison failed with error code: " + std::string(u_errorName(status)),
DB::ErrorCodes::COLLATION_COMPARISON_FAILED);
/** Values of enum UCollationResult are equals to what exactly we need:
@ -83,14 +152,3 @@ const std::string & Collator::getLocale() const
{
return locale;
}
std::vector<std::string> Collator::getAvailableCollations()
{
std::vector<std::string> result;
#if USE_ICU
size_t available_locales_count = ucol_countAvailable();
for (size_t i = 0; i < available_locales_count; ++i)
result.push_back(ucol_getAvailable(i));
#endif
return result;
}

View File

@ -3,9 +3,39 @@
#include <string>
#include <vector>
#include <boost/noncopyable.hpp>
#include <unordered_map>
struct UCollator;
/// Class represents available locales for collations.
class AvailableCollationLocales : private boost::noncopyable
{
public:
struct LocaleAndLanguage
{
std::string locale_name; /// ISO locale code
std::optional<std::string> language; /// full language name in English
};
using AvailableLocalesMap = std::unordered_map<std::string, LocaleAndLanguage>;
using LocalesVector = std::vector<LocaleAndLanguage>;
static const AvailableCollationLocales & instance();
/// Get all collations with names in sorted order
LocalesVector getAvailableCollations() const;
/// Check that collation is supported
bool isCollationSupported(const std::string & locale_name) const;
private:
AvailableCollationLocales();
private:
AvailableLocalesMap locales_map;
};
class Collator : private boost::noncopyable
{
public:
@ -15,10 +45,8 @@ public:
int compare(const char * str1, size_t length1, const char * str2, size_t length2) const;
const std::string & getLocale() const;
static std::vector<std::string> getAvailableCollations();
private:
std::string locale;
UCollator * collator;
};

View File

@ -0,0 +1,40 @@
#if defined(OS_LINUX)
#include <stdlib.h>
/// Interposing these symbols explicitly. The idea works like this: malloc.cpp compiles to a
/// dedicated object (namely clickhouse_malloc.o), and it will show earlier in the link command
/// than malloc libs like libjemalloc.a. As a result, these symbols get picked in time right after.
extern "C"
{
void *malloc(size_t size);
void free(void *ptr);
void *calloc(size_t nmemb, size_t size);
void *realloc(void *ptr, size_t size);
int posix_memalign(void **memptr, size_t alignment, size_t size);
void *aligned_alloc(size_t alignment, size_t size);
void *valloc(size_t size);
void *memalign(size_t alignment, size_t size);
void *pvalloc(size_t size);
}
template<typename T>
inline void ignore(T x __attribute__((unused)))
{
}
static void dummyFunctionForInterposing() __attribute__((used));
static void dummyFunctionForInterposing()
{
void* dummy;
/// Suppression for PVS-Studio.
free(nullptr); // -V575
ignore(malloc(0)); // -V575
ignore(calloc(0, 0)); // -V575
ignore(realloc(nullptr, 0)); // -V575
ignore(posix_memalign(&dummy, 0, 0)); // -V575
ignore(aligned_alloc(0, 0)); // -V575
ignore(valloc(0)); // -V575
ignore(memalign(0, 0)); // -V575
ignore(pvalloc(0)); // -V575
}
#endif

View File

@ -143,7 +143,7 @@ ColumnDefinition getColumnDefinition(const String & column_name, const TypeIndex
flags = ColumnDefinitionFlags::BINARY_FLAG;
break;
case TypeIndex::Float64:
column_type = ColumnType::MYSQL_TYPE_TINY;
column_type = ColumnType::MYSQL_TYPE_DOUBLE;
flags = ColumnDefinitionFlags::BINARY_FLAG;
break;
case TypeIndex::Date:
@ -155,8 +155,6 @@ ColumnDefinition getColumnDefinition(const String & column_name, const TypeIndex
flags = ColumnDefinitionFlags::BINARY_FLAG;
break;
case TypeIndex::String:
column_type = ColumnType::MYSQL_TYPE_STRING;
break;
case TypeIndex::FixedString:
column_type = ColumnType::MYSQL_TYPE_STRING;
break;

View File

@ -52,7 +52,7 @@ struct Less
{
for (auto it = left_columns.begin(), jt = right_columns.begin(); it != left_columns.end(); ++it, ++jt)
{
int res = it->second.direction * it->first->compareAt(a, b, *jt->first, it->second.nulls_direction);
int res = it->description.direction * it->column->compareAt(a, b, *jt->column, it->description.nulls_direction);
if (res < 0)
return true;
else if (res > 0)

View File

@ -263,6 +263,11 @@ protected:
*/
bool checkTimeLimit();
#ifndef NDEBUG
bool read_prefix_is_called = false;
bool read_suffix_is_called = false;
#endif
private:
bool enabled_extremes = false;
@ -315,10 +320,6 @@ private:
return;
}
#ifndef NDEBUG
bool read_prefix_is_called = false;
bool read_suffix_is_called = false;
#endif
};
}

View File

@ -57,12 +57,19 @@ NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server
}
}
// also resets few vars from IBlockInputStream (I didn't want to propagate resetParser upthere)
void NativeBlockInputStream::resetParser()
{
istr_concrete = nullptr;
use_index = false;
header.clear();
avg_value_size_hints.clear();
#ifndef NDEBUG
read_prefix_is_called = false;
read_suffix_is_called = false;
#endif
is_cancelled.store(false);
is_killed.store(false);
}
void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)

View File

@ -5,6 +5,8 @@
#include <IO/ConnectionTimeouts.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Poco/Net/HTTPRequest.h>
#include <common/logger_useful.h>
@ -82,12 +84,9 @@ void HTTPDictionarySource::getUpdateFieldAndDate(Poco::URI & uri)
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1;
char buffer[80];
struct tm * timeinfo;
timeinfo = localtime(&hr_time);
strftime(buffer, 80, "%Y-%m-%d %H:%M:%S", timeinfo);
std::string str_time(buffer);
uri.addQueryParameter(update_field, str_time);
WriteBufferFromOwnString out;
writeDateTimeText(hr_time, out);
uri.addQueryParameter(update_field, out.str());
}
else
{

View File

@ -20,6 +20,7 @@ void registerFunctionsJSON(FunctionFactory & factory)
factory.registerFunction<FunctionJSON<NameJSONExtract, JSONExtractImpl>>();
factory.registerFunction<FunctionJSON<NameJSONExtractKeysAndValues, JSONExtractKeysAndValuesImpl>>();
factory.registerFunction<FunctionJSON<NameJSONExtractRaw, JSONExtractRawImpl>>();
factory.registerFunction<FunctionJSON<NameJSONExtractArrayRaw, JSONExtractArrayRawImpl>>();
}
}

View File

@ -291,6 +291,7 @@ struct NameJSONExtractString { static constexpr auto name{"JSONExtractString"};
struct NameJSONExtract { static constexpr auto name{"JSONExtract"}; };
struct NameJSONExtractKeysAndValues { static constexpr auto name{"JSONExtractKeysAndValues"}; };
struct NameJSONExtractRaw { static constexpr auto name{"JSONExtractRaw"}; };
struct NameJSONExtractArrayRaw { static constexpr auto name{"JSONExtractArrayRaw"}; };
template <typename JSONParser>
@ -1088,4 +1089,39 @@ private:
}
};
template <typename JSONParser>
class JSONExtractArrayRawImpl
{
public:
static DataTypePtr getType(const char *, const ColumnsWithTypeAndName &)
{
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>());
}
using Iterator = typename JSONParser::Iterator;
static bool addValueToColumn(IColumn & dest, const Iterator & it)
{
if (!JSONParser::isArray(it))
{
return false;
}
ColumnArray & col_res = assert_cast<ColumnArray &>(dest);
Iterator array_it = it;
size_t size = 0;
if (JSONParser::firstArrayElement(array_it))
{
do
{
JSONExtractRawImpl<JSONParser>::addValueToColumn(col_res.getData(), array_it);
++size;
} while (JSONParser::nextArrayElement(array_it));
}
col_res.getOffsets().push_back(col_res.getOffsets().back() + size);
return true;
}
static constexpr size_t num_extra_arguments = 0;
static void prepare(const char *, const Block &, const ColumnNumbers &, size_t) {}
};
}

View File

@ -1,7 +1,9 @@
#include <Interpreters/sortBlock.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnConst.h>
#include <Common/typeid_cast.h>
#include <Functions/FunctionHelpers.h>
#include <pdqsort.h>
@ -13,16 +15,9 @@ namespace ErrorCodes
extern const int BAD_COLLATION;
}
static inline bool needCollation(const IColumn * column, const SortColumnDescription & description)
static bool isCollationRequired(const SortColumnDescription & description)
{
if (!description.collator)
return false;
if (!typeid_cast<const ColumnString *>(column)) /// TODO Nullable(String)
throw Exception("Collations could be specified only for String columns.", ErrorCodes::BAD_COLLATION);
return true;
return description.collator != nullptr;
}
@ -38,7 +33,7 @@ ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, c
? block.getByName(description[i].column_name).column.get()
: block.safeGetByPosition(description[i].column_number).column.get();
res.emplace_back(column, description[i]);
res.emplace_back(ColumnWithSortDescription{column, description[i], isColumnConst(*column)});
}
return res;
@ -55,7 +50,11 @@ struct PartialSortingLess
{
for (ColumnsWithSortDescriptions::const_iterator it = columns.begin(); it != columns.end(); ++it)
{
int res = it->second.direction * it->first->compareAt(a, b, *it->first, it->second.nulls_direction);
int res;
if (it->column_const)
res = 0;
else
res = it->description.direction * it->column->compareAt(a, b, *it->column, it->description.nulls_direction);
if (res < 0)
return true;
else if (res > 0)
@ -70,22 +69,29 @@ struct PartialSortingLessWithCollation
{
const ColumnsWithSortDescriptions & columns;
explicit PartialSortingLessWithCollation(const ColumnsWithSortDescriptions & columns_) : columns(columns_) {}
explicit PartialSortingLessWithCollation(const ColumnsWithSortDescriptions & columns_)
: columns(columns_)
{
}
bool operator() (size_t a, size_t b) const
{
for (ColumnsWithSortDescriptions::const_iterator it = columns.begin(); it != columns.end(); ++it)
{
int res;
if (needCollation(it->first, it->second))
if (it->column_const)
{
const ColumnString & column_string = typeid_cast<const ColumnString &>(*it->first);
res = column_string.compareAtWithCollation(a, b, *it->first, *it->second.collator);
res = 0;
}
else if (isCollationRequired(it->description))
{
const ColumnString & column_string = assert_cast<const ColumnString &>(*it->column);
res = column_string.compareAtWithCollation(a, b, *it->column, *it->description.collator);
}
else
res = it->first->compareAt(a, b, *it->first, it->second.nulls_direction);
res *= it->second.direction;
res = it->column->compareAt(a, b, *it->column, it->description.nulls_direction);
res *= it->description.direction;
if (res < 0)
return true;
else if (res > 0)
@ -100,27 +106,44 @@ void sortBlock(Block & block, const SortDescription & description, UInt64 limit)
if (!block)
return;
/// If only one column to sort by
if (description.size() == 1)
{
IColumn::Permutation perm;
bool reverse = description[0].direction == -1;
const IColumn * column = !description[0].column_name.empty()
? block.getByName(description[0].column_name).column.get()
: block.safeGetByPosition(description[0].column_number).column.get();
IColumn::Permutation perm;
if (needCollation(column, description[0]))
bool is_column_const = false;
if (isCollationRequired(description[0]))
{
const ColumnString & column_string = typeid_cast<const ColumnString &>(*column);
column_string.getPermutationWithCollation(*description[0].collator, reverse, limit, perm);
/// it it's real string column, than we need sort
if (const ColumnString * column_string = checkAndGetColumn<ColumnString>(column))
column_string->getPermutationWithCollation(*description[0].collator, reverse, limit, perm);
else if (checkAndGetColumnConstData<ColumnString>(column))
is_column_const = true;
else
throw Exception("Collations could be specified only for String columns.", ErrorCodes::BAD_COLLATION);
}
else
else if (!isColumnConst(*column))
column->getPermutation(reverse, limit, description[0].nulls_direction, perm);
else
/// we don't need to do anything with const column
is_column_const = true;
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
block.getByPosition(i).column = block.getByPosition(i).column->permute(perm, limit);
{
if (!is_column_const)
block.getByPosition(i).column = block.getByPosition(i).column->permute(perm, limit);
else if (limit != 0) // LIMIT exists
block.getByPosition(i).column = block.getByPosition(i).column->cut(0, limit);
}
}
else
{
@ -137,10 +160,13 @@ void sortBlock(Block & block, const SortDescription & description, UInt64 limit)
for (size_t i = 0, num_sort_columns = description.size(); i < num_sort_columns; ++i)
{
if (needCollation(columns_with_sort_desc[i].first, description[i]))
const IColumn * column = columns_with_sort_desc[i].column;
if (isCollationRequired(description[i]))
{
if (!checkAndGetColumn<ColumnString>(column) && !checkAndGetColumnConstData<ColumnString>(column))
throw Exception("Collations could be specified only for String columns.", ErrorCodes::BAD_COLLATION);
need_collation = true;
break;
}
}
@ -165,7 +191,9 @@ void sortBlock(Block & block, const SortDescription & description, UInt64 limit)
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
{
block.getByPosition(i).column = block.getByPosition(i).column->permute(perm, limit);
}
}
}

View File

@ -29,7 +29,17 @@ void stableGetPermutation(const Block & block, const SortDescription & descripti
*/
bool isAlreadySorted(const Block & block, const SortDescription & description);
using ColumnsWithSortDescriptions = std::vector<std::pair<const IColumn *, SortColumnDescription>>;
/// Column with description for sort
struct ColumnWithSortDescription
{
const IColumn * column;
SortColumnDescription description;
/// It means, that this column is ColumnConst
bool column_const = false;
};
using ColumnsWithSortDescriptions = std::vector<ColumnWithSortDescription>;
ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, const SortDescription & description);

View File

@ -2,9 +2,7 @@
#include <Parsers/ASTSystemQuery.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/parseIdentifierOrStringLiteral.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/parseDatabaseAndTableName.h>
#include <Interpreters/evaluateConstantExpression.h>
namespace ErrorCodes
@ -19,7 +17,7 @@ namespace DB
bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
if (!ParserKeyword{"SYSTEM"}.ignore(pos))
if (!ParserKeyword{"SYSTEM"}.ignore(pos, expected))
return false;
using Type = ASTSystemQuery::Type;
@ -30,7 +28,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
for (int i = static_cast<int>(Type::UNKNOWN) + 1; i < static_cast<int>(Type::END); ++i)
{
Type t = static_cast<Type>(i);
if (ParserKeyword{ASTSystemQuery::typeToString(t)}.ignore(pos))
if (ParserKeyword{ASTSystemQuery::typeToString(t)}.ignore(pos, expected))
{
res->type = t;
found = true;

View File

@ -6,7 +6,6 @@
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
namespace DB
{
KafkaBlockInputStream::KafkaBlockInputStream(
@ -66,20 +65,8 @@ Block KafkaBlockInputStream::readImpl()
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto read_callback = [&]
{
virtual_columns[0]->insert(buffer->currentTopic()); // "topic"
virtual_columns[1]->insert(buffer->currentKey()); // "key"
virtual_columns[2]->insert(buffer->currentOffset()); // "offset"
virtual_columns[3]->insert(buffer->currentPartition()); // "partition"
auto timestamp = buffer->currentTimestamp();
if (timestamp)
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(timestamp->get_timestamp()).count()); // "timestamp"
};
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, read_callback);
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);
@ -106,13 +93,17 @@ Block KafkaBlockInputStream::readImpl()
case IProcessor::Status::PortFull:
{
auto chunk = port.pull();
new_rows = new_rows + chunk.getNumRows();
/// FIXME: materialize MATERIALIZED columns here.
// that was returning bad value before https://github.com/ClickHouse/ClickHouse/pull/8005
// if will be backported should go together with #8005
auto chunk_rows = chunk.getNumRows();
new_rows += chunk_rows;
auto columns = chunk.detachColumns();
for (size_t i = 0, s = columns.size(); i < s; ++i)
{
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
}
break;
}
case IProcessor::Status::NeedData:
@ -125,18 +116,55 @@ Block KafkaBlockInputStream::readImpl()
};
size_t total_rows = 0;
while (total_rows < max_block_size)
while (true)
{
// some formats (like RowBinaryWithNamesAndTypes / CSVWithNames)
// throw an exception from readPrefix when buffer in empty
if (buffer->eof())
break;
auto new_rows = read_kafka_message();
auto _topic = buffer->currentTopic();
auto _key = buffer->currentKey();
auto _offset = buffer->currentOffset();
auto _partition = buffer->currentPartition();
auto _timestamp_raw = buffer->currentTimestamp();
auto _timestamp = _timestamp_raw ? std::chrono::duration_cast<std::chrono::seconds>(_timestamp_raw->get_timestamp()).count()
: 0;
for (size_t i = 0; i < new_rows; ++i)
{
virtual_columns[0]->insert(_topic);
virtual_columns[1]->insert(_key);
virtual_columns[2]->insert(_offset);
virtual_columns[3]->insert(_partition);
if (_timestamp_raw)
{
virtual_columns[4]->insert(_timestamp);
}
else
{
virtual_columns[4]->insertDefault();
}
}
total_rows = total_rows + new_rows;
buffer->allowNext();
if (!new_rows || !checkTimeLimit())
if (!new_rows || total_rows >= max_block_size || !checkTimeLimit())
break;
}
if (total_rows == 0)
return Block();
/// MATERIALIZED columns can be added here, but I think
// they are not needed here:
// and it's misleading to use them here,
// as columns 'materialized' that way stays 'ephemeral'
// i.e. will not be stored anythere
// If needed any extra columns can be added using DEFAULT they can be added at MV level if needed.
auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));

View File

@ -17,6 +17,7 @@ namespace DB
MergeListElement::MergeListElement(const std::string & database_, const std::string & table_, const FutureMergedMutatedPart & future_part)
: database{database_}, table{table_}, partition_id{future_part.part_info.partition_id}
, result_part_name{future_part.name}
, result_part_path{future_part.path}
, result_data_version{future_part.part_info.getDataVersion()}
, num_parts{future_part.parts.size()}
, thread_number{getThreadNumber()}
@ -24,6 +25,7 @@ MergeListElement::MergeListElement(const std::string & database_, const std::str
for (const auto & source_part : future_part.parts)
{
source_part_names.emplace_back(source_part->name);
source_part_paths.emplace_back(source_part->getFullPath());
std::shared_lock<std::shared_mutex> part_lock(source_part->columns_lock);
@ -54,6 +56,7 @@ MergeInfo MergeListElement::getInfo() const
res.database = database;
res.table = table;
res.result_part_name = result_part_name;
res.result_part_path = result_part_path;
res.partition_id = partition_id;
res.is_mutation = is_mutation;
res.elapsed = watch.elapsedSeconds();
@ -73,6 +76,9 @@ MergeInfo MergeListElement::getInfo() const
for (const auto & source_part_name : source_part_names)
res.source_part_names.emplace_back(source_part_name);
for (const auto & source_part_path : source_part_paths)
res.source_part_paths.emplace_back(source_part_path);
return res;
}

View File

@ -28,7 +28,9 @@ struct MergeInfo
std::string database;
std::string table;
std::string result_part_name;
std::string result_part_path;
Array source_part_names;
Array source_part_paths;
std::string partition_id;
bool is_mutation;
Float64 elapsed;
@ -55,11 +57,13 @@ struct MergeListElement : boost::noncopyable
std::string partition_id;
const std::string result_part_name;
const std::string result_part_path;
Int64 result_data_version{};
bool is_mutation{};
UInt64 num_parts{};
Names source_part_names;
Names source_part_paths;
Int64 source_data_version{};
Stopwatch watch;

View File

@ -120,6 +120,11 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_)
name = part_info.getPartName();
}
void FutureMergedMutatedPart::updatePath(const MergeTreeData & storage, const ReservationPtr & reservation)
{
path = storage.getFullPathOnDisk(reservation->getDisk()) + name + "/";
}
MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, size_t background_pool_size_)
: data(data_), background_pool_size(background_pool_size_), log(&Logger::get(data.getLogName() + " (MergerMutator)"))
{

View File

@ -17,6 +17,7 @@ class MergeProgressCallback;
struct FutureMergedMutatedPart
{
String name;
String path;
MergeTreePartInfo part_info;
MergeTreeData::DataPartsVector parts;
@ -29,6 +30,7 @@ struct FutureMergedMutatedPart
}
void assign(MergeTreeData::DataPartsVector parts_);
void updatePath(const MergeTreeData & storage, const ReservationPtr & reservation);
};

View File

@ -581,7 +581,6 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu
void MergeTreeDataPart::loadIndexGranularity()
{
String full_path = getFullPath();
index_granularity_info.changeGranularityIfRequired(full_path);

View File

@ -23,6 +23,7 @@ namespace DB
struct ColumnSize;
class MergeTreeData;
struct FutureMergedMutatedPart;
/// Description of the data part.

View File

@ -343,7 +343,7 @@ struct CurrentlyMergingPartsTagger
StorageMergeTree & storage;
public:
CurrentlyMergingPartsTagger(const FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation)
CurrentlyMergingPartsTagger(FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation)
: future_part(future_part_), storage(storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
@ -361,6 +361,8 @@ public:
throw Exception("Not enough space for merging parts", ErrorCodes::NOT_ENOUGH_SPACE);
}
future_part_.updatePath(storage, reserved_space);
for (const auto & part : future_part.parts)
{
if (storage.currently_merging_mutating_parts.count(part))

View File

@ -1016,6 +1016,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
throw Exception("Future merged part name " + backQuote(future_merged_part.name) + " differs from part name in log entry: "
+ backQuote(entry.new_part_name), ErrorCodes::BAD_DATA_PART_NAME);
}
future_merged_part.updatePath(*this, reserved_space);
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(database_name, table_name, future_merged_part);
@ -1156,6 +1157,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
future_mutated_part.parts.push_back(source_part);
future_mutated_part.part_info = new_part_info;
future_mutated_part.name = entry.new_part_name;
future_mutated_part.updatePath(*this, reserved_space);
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(
database_name, table_name, future_mutated_part);

View File

@ -1,5 +1,6 @@
#include <Columns/Collator.h>
#include <Storages/System/StorageSystemCollations.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
@ -8,13 +9,17 @@ NamesAndTypesList StorageSystemCollations::getNamesAndTypes()
{
return {
{"name", std::make_shared<DataTypeString>()},
{"language", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())},
};
}
void StorageSystemCollations::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
for (const auto & collation_name : Collator::getAvailableCollations())
res_columns[0]->insert(collation_name);
for (const auto & [locale, lang]: AvailableCollationLocales::instance().getAvailableCollations())
{
res_columns[0]->insert(locale);
res_columns[1]->insert(lang ? *lang : Field());
}
}
}

View File

@ -16,6 +16,8 @@ NamesAndTypesList StorageSystemMerges::getNamesAndTypes()
{"num_parts", std::make_shared<DataTypeUInt64>()},
{"source_part_names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"result_part_name", std::make_shared<DataTypeString>()},
{"source_part_paths", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"result_part_path", std::make_shared<DataTypeString>()},
{"partition_id", std::make_shared<DataTypeString>()},
{"is_mutation", std::make_shared<DataTypeUInt8>()},
{"total_size_bytes_compressed", std::make_shared<DataTypeUInt64>()},
@ -45,6 +47,8 @@ void StorageSystemMerges::fillData(MutableColumns & res_columns, const Context &
res_columns[i++]->insert(merge.num_parts);
res_columns[i++]->insert(merge.source_part_names);
res_columns[i++]->insert(merge.result_part_name);
res_columns[i++]->insert(merge.source_part_paths);
res_columns[i++]->insert(merge.result_part_path);
res_columns[i++]->insert(merge.partition_id);
res_columns[i++]->insert(merge.is_mutation);
res_columns[i++]->insert(merge.total_size_bytes_compressed);

View File

@ -6,7 +6,7 @@ Note: We use Address Sanitizer to run functional tests for every commit automati
mkdir build_asan && cd build_asan
```
Note: using clang instead of gcc is strongly recommended.
Note: using clang instead of gcc is strongly recommended. Make sure you have installed required packages (`clang`, `lld`). It may be required to specify non-standard `lld` binary using `LINKER_NAME` option (e.g. `-D LINKER_NAME=lld-8`).
```
CC=clang CXX=clang++ cmake -D SANITIZE=address ..
@ -67,5 +67,5 @@ sudo -u clickhouse UBSAN_OPTIONS='print_stacktrace=1' ./clickhouse-ubsan server
# How to use Memory Sanitizer
```
CC=clang-8 CXX=clang++-8 cmake -D ENABLE_HDFS=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_POCO_ODBC=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0 -D ENABLE_EMBEDDED_COMPILER=0 -D USE_INTERNAL_CAPNP_LIBRARY=0 -D USE_SIMDJSON=0 -DENABLE_READLINE=0 -D SANITIZE=memory ..
CC=clang-8 CXX=clang++-8 cmake -D ENABLE_HDFS=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_POCO_ODBC=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0 -D ENABLE_EMBEDDED_COMPILER=0 -D USE_INTERNAL_CAPNP_LIBRARY=0 -D USE_SIMDJSON=0 -D ENABLE_READLINE=0 -D SANITIZE=memory ..
```

View File

@ -1,11 +1,14 @@
# coding: utf-8
import datetime
import math
import os
import docker
import pytest
import subprocess
import pymysql.connections
import time
import docker
import pymysql.connections
from docker.models.containers import Container
from helpers.cluster import ClickHouseCluster
@ -243,3 +246,58 @@ def test_mysqljs_client(server_address, nodejs_container):
code, (_, _) = nodejs_container.exec_run('node test.js {host} {port} user_with_empty_password 123'.format(host=server_address, port=server_port), demux=True)
assert code == 1
def test_types(server_address):
client = pymysql.connections.Connection(host=server_address, user='default', password='123', database='default', port=server_port)
cursor = client.cursor(pymysql.cursors.DictCursor)
cursor.execute(
"select "
"toInt8(-pow(2, 7)) as Int8_column, "
"toUInt8(pow(2, 8) - 1) as UInt8_column, "
"toInt16(-pow(2, 15)) as Int16_column, "
"toUInt16(pow(2, 16) - 1) as UInt16_column, "
"toInt32(-pow(2, 31)) as Int32_column, "
"toUInt32(pow(2, 32) - 1) as UInt32_column, "
"toInt64('-9223372036854775808') as Int64_column, " # -2^63
"toUInt64('18446744073709551615') as UInt64_column, " # 2^64 - 1
"'тест' as String_column, "
"toFixedString('тест', 8) as FixedString_column, "
"toFloat32(1.5) as Float32_column, "
"toFloat64(1.5) as Float64_column, "
"toFloat32(NaN) as Float32_NaN_column, "
"-Inf as Float64_Inf_column, "
"toDate('2019-12-08') as Date_column, "
"toDate('1970-01-01') as Date_min_column, "
"toDate('1970-01-02') as Date_after_min_column, "
"toDateTime('2019-12-08 08:24:03') as DateTime_column"
)
result = cursor.fetchall()[0]
expected = [
('Int8_column', -2 ** 7),
('UInt8_column', 2 ** 8 - 1),
('Int16_column', -2 ** 15),
('UInt16_column', 2 ** 16 - 1),
('Int32_column', -2 ** 31),
('UInt32_column', 2 ** 32 - 1),
('Int64_column', -2 ** 63),
('UInt64_column', 2 ** 64 - 1),
('String_column', 'тест'),
('FixedString_column', 'тест'),
('Float32_column', 1.5),
('Float64_column', 1.5),
('Float32_NaN_column', float('nan')),
('Float64_Inf_column', float('-inf')),
('Date_column', datetime.date(2019, 12, 8)),
('Date_min_column', '0000-00-00'),
('Date_after_min_column', datetime.date(1970, 1, 2)),
('DateTime_column', datetime.datetime(2019, 12, 8, 8, 24, 3)),
]
for key, value in expected:
if isinstance(value, float) and math.isnan(value):
assert math.isnan(result[key])
else:
assert result[key] == value

View File

@ -0,0 +1,26 @@
import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryTimeoutExceedException, QueryRuntimeException
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node')
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_different_versions(start_cluster):
with pytest.raises(QueryTimeoutExceedException):
node.query("SELECT sleep(3)", timeout=1)
with pytest.raises(QueryRuntimeException):
node.query("SELECT 1", settings={'max_concurrent_queries_for_user': 1})
assert node.contains_in_log('Too many simultaneous queries for user')
assert not node.contains_in_log('Unknown packet')

View File

@ -0,0 +1,16 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,17 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
<part_log>
<database>system</database>
<table>part_log</table>
<flush_interval_milliseconds>500</flush_interval_milliseconds>
</part_log>
</yandex>

View File

@ -0,0 +1,160 @@
import pytest
import threading
import time
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1',
config_dir='configs',
main_configs=['configs/logs_config.xml'],
with_zookeeper=True,
macros={"shard": 0, "replica": 1} )
node2 = cluster.add_instance('node2',
config_dir='configs',
main_configs=['configs/logs_config.xml'],
with_zookeeper=True,
macros={"shard": 0, "replica": 2} )
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def split_tsv(data):
return [ x.split("\t") for x in data.splitlines() ]
@pytest.mark.parametrize("replicated", [
"",
"replicated"
])
def test_merge_simple(started_cluster, replicated):
try:
clickhouse_path = "/var/lib/clickhouse"
name = "test_merge_simple"
nodes = [node1, node2] if replicated else [node1]
engine = "ReplicatedMergeTree('/clickhouse/test_merge_simple', '{replica}')" if replicated else "MergeTree()"
node_check = nodes[-1]
starting_block = 0 if replicated else 1
for node in nodes:
node.query("""
CREATE TABLE {name}
(
`a` Int64
)
ENGINE = {engine}
ORDER BY sleep(2)
""".format(engine=engine, name=name))
node1.query("INSERT INTO {name} VALUES (1)".format(name=name))
node1.query("INSERT INTO {name} VALUES (2)".format(name=name))
node1.query("INSERT INTO {name} VALUES (3)".format(name=name))
parts = ["all_{}_{}_0".format(x, x) for x in range(starting_block, starting_block+3)]
result_part = "all_{}_{}_1".format(starting_block, starting_block+2)
def optimize():
node1.query("OPTIMIZE TABLE {name}".format(name=name))
wait = threading.Thread(target=time.sleep, args=(5,))
wait.start()
t = threading.Thread(target=optimize)
t.start()
time.sleep(1)
assert split_tsv(node_check.query("""
SELECT database, table, num_parts, source_part_names, source_part_paths, result_part_name, result_part_path, partition_id, is_mutation
FROM system.merges
WHERE table = '{name}'
""".format(name=name))) == [
[
"default",
name,
"3",
"['{}','{}','{}']".format(*parts),
"['{clickhouse}/data/default/{name}/{}/','{clickhouse}/data/default/{name}/{}/','{clickhouse}/data/default/{name}/{}/']".format(*parts, clickhouse=clickhouse_path, name=name),
result_part,
"{clickhouse}/data/default/{name}/{}/".format(result_part, clickhouse=clickhouse_path, name=name),
"all",
"0"
]
]
t.join()
wait.join()
assert node_check.query("SELECT * FROM system.merges WHERE table = '{name}'".format(name=name)) == ""
finally:
for node in nodes:
node.query("DROP TABLE {name}".format(name=name))
@pytest.mark.parametrize("replicated", [
"",
"replicated"
])
def test_mutation_simple(started_cluster, replicated):
try:
clickhouse_path = "/var/lib/clickhouse"
name = "test_mutation_simple"
nodes = [node1, node2] if replicated else [node1]
engine = "ReplicatedMergeTree('/clickhouse/test_mutation_simple', '{replica}')" if replicated else "MergeTree()"
node_check = nodes[-1]
starting_block = 0 if replicated else 1
for node in nodes:
node.query("""
CREATE TABLE {name}
(
`a` Int64
)
ENGINE = {engine}
ORDER BY tuple()
""".format(engine=engine, name=name))
node1.query("INSERT INTO {name} VALUES (1)".format(name=name))
part = "all_{}_{}_0".format(starting_block, starting_block)
result_part = "all_{}_{}_0_{}".format(starting_block, starting_block, starting_block+1)
def alter():
node1.query("ALTER TABLE {name} UPDATE a = 42 WHERE sleep(2) OR 1".format(name=name))
t = threading.Thread(target=alter)
t.start()
time.sleep(1)
assert split_tsv(node_check.query("""
SELECT database, table, num_parts, source_part_names, source_part_paths, result_part_name, result_part_path, partition_id, is_mutation
FROM system.merges
WHERE table = '{name}'
""".format(name=name))) == [
[
"default",
name,
"1",
"['{}']".format(part),
"['{clickhouse}/data/default/{name}/{}/']".format(part, clickhouse=clickhouse_path, name=name),
result_part,
"{clickhouse}/data/default/{name}/{}/".format(result_part, clickhouse=clickhouse_path, name=name),
"all",
"1"
],
]
t.join()
time.sleep(1.5)
assert node_check.query("SELECT * FROM system.merges WHERE table = '{name}'".format(name=name)) == ""
finally:
for node in nodes:
node.query("DROP TABLE {name}".format(name=name))

View File

@ -0,0 +1,25 @@
<test>
<type>loop</type>
<stop_conditions>
<all_of>
<iterations>5</iterations>
<min_time_not_changing_for_ms>10000</min_time_not_changing_for_ms>
</all_of>
<any_of>
<iterations>100</iterations>
<total_time_ms>60000</total_time_ms>
</any_of>
</stop_conditions>
<main_metric>
<min_time/>
</main_metric>
<preconditions>
<table_exists>test.hits</table_exists>
</preconditions>
<query>SELECT Title FROM test.hits ORDER BY Title DESC LIMIT 1000, 10</query>
<query>SELECT Title FROM test.hits ORDER BY Title DESC COLLATE 'tr' LIMIT 1000, 10</query>
</test>

View File

@ -1,15 +0,0 @@
<test>
<type>once</type>
<stop_conditions>
<any_of>
<average_speed_not_changing_for_ms>1000</average_speed_not_changing_for_ms>
<total_time_ms>10000</total_time_ms>
</any_of>
</stop_conditions>
<!-- lon [-180; 180], lat [-90; 90] -->
<query>SELECT count() FROM system.numbers WHERE NOT ignore(greatCircleDistance((rand() % 360) * 1. - 180, (number % 150) * 1.2 - 90, (number % 360) + toFloat64(rand()) / 4294967296 - 180, (rand() % 180) * 1. - 90))</query>
<!-- 55.755830, 37.617780 is center of Moscow -->
<query>SELECT count() FROM system.numbers WHERE NOT ignore(greatCircleDistance(55. + toFloat64(rand()) / 4294967296, 37. + toFloat64(rand()) / 4294967296, 55. + toFloat64(rand()) / 4294967296, 37. + toFloat64(rand()) / 4294967296))</query>
</test>

View File

@ -1,15 +1,18 @@
Русский (default)
Ё
А
Я
а
я
ё
Русский (ru)
а
А
ё
Ё
я
Я
Русский (ru distributed)
а
а
А
@ -22,6 +25,7 @@
я
Я
Я
Türk (default)
A
A
B
@ -132,6 +136,7 @@ z
ı
Ş
ş
Türk (tr)
a
a
A
@ -242,9 +247,62 @@ z
z
Z
Z
english (default)
A
Q
Z
c
e
english (en_US)
A
c
e
Q
Z
english (en)
A
c
e
Q
Z
español (default)
F
J
z
Ñ
español (es)
F
J
Ñ
z
Український (default)
І
Б
ї
ґ
Український (uk)
Б
ґ
І
ї
Русский (ru group by)
а 1
А 4
ё 3
Ё 6
я 2
Я 5
ζ
0
1
0
1
10
2
3
4
5
6
7
8
9

View File

@ -1,6 +1,49 @@
SELECT 'Русский (default)';
SELECT arrayJoin(['а', 'я', 'ё', 'А', 'Я', 'Ё']) AS x ORDER BY x;
SELECT 'Русский (ru)';
SELECT arrayJoin(['а', 'я', 'ё', 'А', 'Я', 'Ё']) AS x ORDER BY x COLLATE 'ru';
SELECT 'Русский (ru distributed)';
SELECT arrayJoin(['а', 'я', 'ё', 'А', 'Я', 'Ё']) AS x FROM remote('127.0.0.{2,3}', system, one) ORDER BY x COLLATE 'ru';
SELECT 'Türk (default)';
SELECT arrayJoin(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'ç', 'd', 'e', 'f', 'g', 'ğ', 'h', 'ı', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'ö', 'p', 'r', 's', 'ş', 't', 'u', 'ü', 'v', 'y', 'z', 'A', 'B', 'C', 'Ç', 'D', 'E', 'F', 'G', 'Ğ', 'H', 'I', 'İ', 'J', 'K', 'L', 'M', 'N', 'O', 'Ö', 'P', 'R', 'S', 'Ş', 'T', 'U', 'Ü', 'V', 'Y', 'Z']) AS x ORDER BY x;
SELECT 'Türk (tr)';
SELECT arrayJoin(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'ç', 'd', 'e', 'f', 'g', 'ğ', 'h', 'ı', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'ö', 'p', 'r', 's', 'ş', 't', 'u', 'ü', 'v', 'y', 'z', 'A', 'B', 'C', 'Ç', 'D', 'E', 'F', 'G', 'Ğ', 'H', 'I', 'İ', 'J', 'K', 'L', 'M', 'N', 'O', 'Ö', 'P', 'R', 'S', 'Ş', 'T', 'U', 'Ü', 'V', 'Y', 'Z']) AS x ORDER BY x COLLATE 'tr';
SELECT 'english (default)';
SELECT arrayJoin(['A', 'c', 'Z', 'Q', 'e']) AS x ORDER BY x;
SELECT 'english (en_US)';
SELECT arrayJoin(['A', 'c', 'Z', 'Q', 'e']) AS x ORDER BY x COLLATE 'en_US';
SELECT 'english (en)';
SELECT arrayJoin(['A', 'c', 'Z', 'Q', 'e']) AS x ORDER BY x COLLATE 'en';
SELECT 'español (default)';
SELECT arrayJoin(['F', 'z', 'J', 'Ñ']) as x ORDER BY x;
SELECT 'español (es)';
SELECT arrayJoin(['F', 'z', 'J', 'Ñ']) as x ORDER BY x COLLATE 'es';
SELECT 'Український (default)';
SELECT arrayJoin(['ґ', 'ї', 'І', 'Б']) as x ORDER BY x;
SELECT 'Український (uk)';
SELECT arrayJoin(['ґ', 'ї', 'І', 'Б']) as x ORDER BY x COLLATE 'uk';
SELECT 'Русский (ru group by)';
SELECT x, n FROM (SELECT ['а', 'я', 'ё', 'А', 'Я', 'Ё'] AS arr) ARRAY JOIN arr AS x, arrayEnumerate(arr) AS n ORDER BY x COLLATE 'ru', n;
--- Const expression
SELECT 'ζ' as x ORDER BY x COLLATE 'el';
-- check order by const with collation
SELECT number FROM numbers(2) ORDER BY 'x' COLLATE 'el';
-- check const and non const columns in order
SELECT number FROM numbers(11) ORDER BY 'x', toString(number), 'y' COLLATE 'el';
--- Trash locales
SELECT '' as x ORDER BY x COLLATE 'qq'; --{serverError 186}
SELECT '' as x ORDER BY x COLLATE 'qwe'; --{serverError 186}
SELECT '' as x ORDER BY x COLLATE 'some_non_existing_locale'; --{serverError 186}
SELECT '' as x ORDER BY x COLLATE 'ру'; --{serverError 186}

View File

@ -166,3 +166,12 @@ d
e
u
v
--JSONExtractArrayRaw--
[]
[]
[]
['[]','[]']
['-100','200','300']
['1','2','3','4','5','"hello"']
['1','2','3']
['4','5','6']

View File

@ -182,3 +182,12 @@ SELECT JSONExtractRaw('{"abc":"\\u263a"}', 'abc');
SELECT '--const/non-const mixed--';
SELECT JSONExtractString('["a", "b", "c", "d", "e"]', idx) FROM (SELECT arrayJoin([1,2,3,4,5]) AS idx);
SELECT JSONExtractString(json, 's') FROM (SELECT arrayJoin(['{"s":"u"}', '{"s":"v"}']) AS json);
SELECT '--JSONExtractArrayRaw--';
SELECT JSONExtractArrayRaw('');
SELECT JSONExtractArrayRaw('{"a": "hello", "b": "not_array"}');
SELECT JSONExtractArrayRaw('[]');
SELECT JSONExtractArrayRaw('[[],[]]');
SELECT JSONExtractArrayRaw('{"a": "hello", "b": [-100, 200.0, 300]}', 'b');
SELECT JSONExtractArrayRaw('[1,2,3,4,5,"hello"]');
SELECT JSONExtractArrayRaw(arrayJoin(JSONExtractArrayRaw('[[1,2,3],[4,5,6]]')));

View File

@ -4,6 +4,7 @@ set -e
CLICKHOUSE_USER=${CLICKHOUSE_USER:=clickhouse}
CLICKHOUSE_GROUP=${CLICKHOUSE_GROUP:=${CLICKHOUSE_USER}}
# Please note that we don't support paths with whitespaces. This is rather ignorant.
CLICKHOUSE_CONFDIR=${CLICKHOUSE_CONFDIR:=/etc/clickhouse-server}
CLICKHOUSE_DATADIR=${CLICKHOUSE_DATADIR:=/var/lib/clickhouse}
CLICKHOUSE_LOGDIR=${CLICKHOUSE_LOGDIR:=/var/log/clickhouse-server}
@ -135,6 +136,8 @@ Please fix this and reinstall this package." >&2
defaultpassword="$RET"
if [ -n "$defaultpassword" ]; then
echo "<yandex><users><default><password>$defaultpassword</password></default></users></yandex>" > ${CLICKHOUSE_CONFDIR}/users.d/default-password.xml
chown ${CLICKHOUSE_USER}:${CLICKHOUSE_GROUP} ${CLICKHOUSE_CONFDIR}/users.d/default-password.xml
chmod 600 ${CLICKHOUSE_CONFDIR}/users.d/default-password.xml
fi
# everything went well, so now let's reset the password

View File

@ -206,4 +206,16 @@ Example:
SELECT JSONExtractRaw('{"a": "hello", "b": [-100, 200.0, 300]}', 'b') = '[-100, 200.0, 300]'
```
## JSONExtractArrayRaw(json[, indices_or_keys]...)
Returns an array with elements of JSON array, each represented as unparsed string.
If the part does not exist or isn't array, an empty array will be returned.
Example:
```sql
SELECT JSONExtractArrayRaw('{"a": "hello", "b": [-100, 200.0, "hello"]}', 'b') = ['-100', '200.0', '"hello"']'
```
[Original article](https://clickhouse.yandex/docs/en/query_language/functions/json_functions/) <!--hide-->

View File

@ -4,8 +4,39 @@
Returns a string with the name of the host that this function was performed on. For distributed processing, this is the name of the remote server host, if the function is performed on a remote server.
## FQDN(), fullHostName()
Returns the Fully qualified domain name aka [FQDN](https://en.wikipedia.org/wiki/Fully_qualified_domain_name).
## FQDN {#fqdn}
Returns the fully qualified domain name.
**Syntax**
```sql
fqdn();
```
This function is case-insensitive.
**Returned value**
- String with the fully qualified domain name.
Type: `String`.
**Example**
Query:
```sql
SELECT FQDN();
```
Result:
```text
┌─FQDN()──────────────────────────┐
│ clickhouse.ru-central1.internal │
└─────────────────────────────────┘
```
## basename

View File

@ -199,9 +199,9 @@ SELECT JSONExtractKeysAndValues('{"x": {"a": 5, "b": 7, "c": 11}}', 'x', 'Int8')
## JSONExtractRaw(json[, indices_or_keys]...)
Возвращает часть JSON.
Возвращает часть JSON в виде строки, содержащей неразобранную подстроку.
Если значение не существует или имеет неверный тип, то возвращается пустая строка.
Если значение не существует, то возвращается пустая строка.
Пример:
@ -209,4 +209,16 @@ SELECT JSONExtractKeysAndValues('{"x": {"a": 5, "b": 7, "c": 11}}', 'x', 'Int8')
SELECT JSONExtractRaw('{"a": "hello", "b": [-100, 200.0, 300]}', 'b') = '[-100, 200.0, 300]'
```
## JSONExtractArrayRaw(json[, indices_or_keys]...)
Возвращает массив из элементов JSON массива, каждый из которых представлен в виде строки с неразобранными подстроками из JSON.
Если значение не существует или не является массивом, то возвращается пустой массив.
Пример:
```sql
SELECT JSONExtractArrayRaw('{"a": "hello", "b": [-100, 200.0, "hello"]}', 'b') = ['-100', '200.0', '"hello"']'
```
[Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/functions/json_functions/) <!--hide-->

View File

@ -4,6 +4,40 @@
Возвращает строку - имя хоста, на котором эта функция была выполнена. При распределённой обработке запроса, это будет имя хоста удалённого сервера, если функция выполняется на удалённом сервере.
## FQDN {#fqdn}
Возвращает полное имя домена.
**Синтаксис**
```sql
fqdn();
```
Эта функция регистронезависимая.
**Возвращаемое значение**
- Полное имя домена.
Тип: `String`.
**Пример**
Запрос:
```sql
SELECT FQDN();
```
Ответ:
```text
┌─FQDN()──────────────────────────┐
│ clickhouse.ru-central1.internal │
└─────────────────────────────────┘
```
## basename
Извлекает конечную часть строки после последнего слэша или бэкслэша. Функция часто используется для извлечения имени файла из пути.

View File

@ -21,7 +21,6 @@ if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS)
add_subdirectory (corrector_utf8)
add_subdirectory (zookeeper-cli)
add_subdirectory (zookeeper-dump-tree)
add_subdirectory (zookeeper-copy-tree)
add_subdirectory (zookeeper-remove-by-list)
add_subdirectory (zookeeper-create-entry-to-download-part)
add_subdirectory (zookeeper-adjust-block-numbers-to-parts)

View File

@ -1,2 +0,0 @@
add_executable (zookeeper-copy-tree main.cpp ${SRCS})
target_link_libraries(zookeeper-copy-tree PRIVATE clickhouse_common_zookeeper clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})

View File

@ -1,149 +0,0 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/Exception.h>
#include <boost/program_options.hpp>
#include <iostream>
namespace DB
{
namespace ErrorCodes
{
extern const int UNEXPECTED_NODE_IN_ZOOKEEPER;
}
}
int main(int argc, char ** argv)
try
{
boost::program_options::options_description desc("Allowed options");
desc.add_options()
("help,h", "produce help message")
("from", boost::program_options::value<std::string>()->required(),
"addresses of source ZooKeeper instances, comma separated. Example: example01e.yandex.ru:2181")
("from-path", boost::program_options::value<std::string>()->required(),
"where to copy from")
("to", boost::program_options::value<std::string>()->required(),
"addresses of destination ZooKeeper instances, comma separated. Example: example01e.yandex.ru:2181")
("to-path", boost::program_options::value<std::string>()->required(),
"where to copy to")
;
boost::program_options::variables_map options;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
if (options.count("help"))
{
std::cout << "Copy a ZooKeeper tree to another cluster." << std::endl;
std::cout << "Usage: " << argv[0] << " [options]" << std::endl;
std::cout << "WARNING: it is almost useless as it is impossible to corretly copy sequential nodes" << std::endl;
std::cout << desc << std::endl;
return 1;
}
zkutil::ZooKeeper from_zookeeper(options.at("from").as<std::string>());
zkutil::ZooKeeper to_zookeeper(options.at("to").as<std::string>());
std::string from_path = options.at("from-path").as<std::string>();
std::string to_path = options.at("to-path").as<std::string>();
if (to_zookeeper.exists(to_path))
throw DB::Exception("Destination path: " + to_path + " already exists, aborting.",
DB::ErrorCodes::UNEXPECTED_NODE_IN_ZOOKEEPER);
struct Node
{
Node(
std::string path_,
std::future<Coordination::GetResponse> get_future_,
std::future<Coordination::ListResponse> children_future_,
Node * parent_)
: path(std::move(path_))
, get_future(std::move(get_future_))
, children_future(std::move(children_future_))
, parent(parent_)
{
}
std::string path;
std::future<Coordination::GetResponse> get_future;
std::future<Coordination::ListResponse> children_future;
Node * parent = nullptr;
std::future<Coordination::CreateResponse> create_future;
bool created = false;
bool deleted = false;
bool ephemeral = false;
};
std::list<Node> nodes_queue;
nodes_queue.emplace_back(
from_path, from_zookeeper.asyncGet(from_path), from_zookeeper.asyncGetChildren(from_path), nullptr);
to_zookeeper.createAncestors(to_path);
for (auto it = nodes_queue.begin(); it != nodes_queue.end(); ++it)
{
Coordination::GetResponse get_response;
Coordination::ListResponse children_response;
try
{
get_response = it->get_future.get();
children_response = it->children_future.get();
}
catch (const Coordination::Exception & e)
{
if (e.code == Coordination::ZNONODE)
{
it->deleted = true;
continue;
}
throw;
}
if (get_response.stat.ephemeralOwner)
{
it->ephemeral = true;
continue;
}
if (it->parent && !it->parent->created)
{
it->parent->create_future.get();
it->parent->created = true;
std::cerr << it->parent->path << " copied!" << std::endl;
}
std::string new_path = it->path;
new_path.replace(0, from_path.length(), to_path);
it->create_future = to_zookeeper.asyncCreate(new_path, get_response.data, zkutil::CreateMode::Persistent);
get_response.data.clear();
get_response.data.shrink_to_fit();
for (const auto & name : children_response.names)
{
std::string child_path = it->path == "/" ? it->path + name : it->path + '/' + name;
nodes_queue.emplace_back(
child_path, from_zookeeper.asyncGet(child_path), from_zookeeper.asyncGetChildren(child_path),
&(*it));
}
}
for (auto it = nodes_queue.begin(); it != nodes_queue.end(); ++it)
{
if (!it->created && !it->deleted && !it->ephemeral)
{
it->create_future.get();
it->created = true;
std::cerr << it->path << " copied!" << std::endl;
}
}
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
throw;
}