diff --git a/CMakeLists.txt b/CMakeLists.txt index 986096ba9e8..34b1035f0cd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -382,16 +382,12 @@ add_subdirectory (contrib EXCLUDE_FROM_ALL) macro (add_executable target) # invoke built-in add_executable - _add_executable (${ARGV}) + # explicitly acquire and interpose malloc symbols by clickhouse_malloc + _add_executable (${ARGV} $) get_target_property (type ${target} TYPE) if (${type} STREQUAL EXECUTABLE) - file (RELATIVE_PATH dir ${CMAKE_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}) - if (${dir} MATCHES "^dbms") - # Only interpose operator::new/delete for dbms executables (MemoryTracker stuff) - target_link_libraries (${target} PRIVATE clickhouse_new_delete ${MALLOC_LIBRARIES}) - else () - target_link_libraries (${target} PRIVATE ${MALLOC_LIBRARIES}) - endif () + # operator::new/delete for executables (MemoryTracker stuff) + target_link_libraries (${target} PRIVATE clickhouse_new_delete ${MALLOC_LIBRARIES}) endif() endmacro() diff --git a/cmake/tools.cmake b/cmake/tools.cmake index 5a183f9eeba..5b6572c9bae 100644 --- a/cmake/tools.cmake +++ b/cmake/tools.cmake @@ -20,9 +20,11 @@ else () message (WARNING "You are using an unsupported compiler. Compilation has only been tested with Clang 6+ and GCC 7+.") endif () -option (LINKER_NAME "Linker name or full path") +STRING(REGEX MATCHALL "[0-9]+" COMPILER_VERSION_LIST ${CMAKE_CXX_COMPILER_VERSION}) +LIST(GET COMPILER_VERSION_LIST 0 COMPILER_VERSION_MAJOR) -find_program (LLD_PATH NAMES "ld.lld" "lld") +option (LINKER_NAME "Linker name or full path") +find_program (LLD_PATH NAMES "ld.lld" "lld" "lld-${COMPILER_VERSION_MAJOR}") find_program (GOLD_PATH NAMES "ld.gold" "gold") if (NOT LINKER_NAME) diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 70dccacef48..e128aacb34c 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -100,7 +100,7 @@ set(dbms_sources) add_headers_and_sources(clickhouse_common_io src/Common) add_headers_and_sources(clickhouse_common_io src/Common/HashTable) add_headers_and_sources(clickhouse_common_io src/IO) -list (REMOVE_ITEM clickhouse_common_io_sources src/Common/new_delete.cpp) +list (REMOVE_ITEM clickhouse_common_io_sources src/Common/malloc.cpp src/Common/new_delete.cpp) if(USE_RDKAFKA) add_headers_and_sources(dbms src/Storages/Kafka) @@ -140,6 +140,9 @@ endif () add_library(clickhouse_common_io ${clickhouse_common_io_headers} ${clickhouse_common_io_sources}) +add_library (clickhouse_malloc OBJECT src/Common/malloc.cpp) +set_source_files_properties(src/Common/malloc.cpp PROPERTIES COMPILE_FLAGS "-fno-builtin") + add_library (clickhouse_new_delete STATIC src/Common/new_delete.cpp) target_link_libraries (clickhouse_new_delete PRIVATE clickhouse_common_io) diff --git a/dbms/programs/odbc-bridge/CMakeLists.txt b/dbms/programs/odbc-bridge/CMakeLists.txt index 460dfd007d4..d03ff257562 100644 --- a/dbms/programs/odbc-bridge/CMakeLists.txt +++ b/dbms/programs/odbc-bridge/CMakeLists.txt @@ -30,11 +30,6 @@ if (Poco_Data_FOUND) set(CLICKHOUSE_ODBC_BRIDGE_LINK ${CLICKHOUSE_ODBC_BRIDGE_LINK} PRIVATE ${Poco_Data_LIBRARY}) set(CLICKHOUSE_ODBC_BRIDGE_INCLUDE ${CLICKHOUSE_ODBC_BRIDGE_INCLUDE} SYSTEM PRIVATE ${Poco_Data_INCLUDE_DIR}) endif () -if (USE_JEMALLOC) - # We need to link jemalloc directly to odbc-bridge-library, because in other case - # we will build it with default malloc. - set(CLICKHOUSE_ODBC_BRIDGE_LINK ${CLICKHOUSE_ODBC_BRIDGE_LINK} PRIVATE ${JEMALLOC_LIBRARIES}) -endif() clickhouse_program_add_library(odbc-bridge) diff --git a/dbms/programs/server/TCPHandler.cpp b/dbms/programs/server/TCPHandler.cpp index 7103769d54e..3378878e718 100644 --- a/dbms/programs/server/TCPHandler.cpp +++ b/dbms/programs/server/TCPHandler.cpp @@ -200,6 +200,8 @@ void TCPHandler::runImpl() /// So, the stream has been marked as cancelled and we can't read from it anymore. state.block_in.reset(); state.maybe_compressed_in.reset(); /// For more accurate accounting by MemoryTracker. + + state.temporary_tables_read = true; }); /// Send structure of columns to client for function input() @@ -339,6 +341,18 @@ void TCPHandler::runImpl() LOG_WARNING(log, "Client has gone away."); } + try + { + if (exception && !state.temporary_tables_read) + query_context->initializeExternalTablesIfSet(); + } + catch (...) + { + network_error = true; + LOG_WARNING(log, "Can't read external tables after query failure."); + } + + try { query_scope.reset(); diff --git a/dbms/programs/server/TCPHandler.h b/dbms/programs/server/TCPHandler.h index 561ed4d0eca..4ab9097b9bb 100644 --- a/dbms/programs/server/TCPHandler.h +++ b/dbms/programs/server/TCPHandler.h @@ -63,6 +63,8 @@ struct QueryState bool sent_all_data = false; /// Request requires data from the client (INSERT, but not INSERT SELECT). bool need_receive_data_for_insert = false; + /// Temporary tables read + bool temporary_tables_read = false; /// Request requires data from client for function input() bool need_receive_data_for_input = false; diff --git a/dbms/src/Columns/Collator.cpp b/dbms/src/Columns/Collator.cpp index 7e8cfba1aac..7f8edc14fd6 100644 --- a/dbms/src/Columns/Collator.cpp +++ b/dbms/src/Columns/Collator.cpp @@ -4,6 +4,9 @@ #if USE_ICU #include + #include + #include + #include #else #ifdef __clang__ #pragma clang diagnostic ignored "-Wunused-private-field" @@ -14,6 +17,7 @@ #include #include #include +#include namespace DB @@ -26,16 +30,81 @@ namespace DB } } -Collator::Collator(const std::string & locale_) : locale(Poco::toLower(locale_)) + +AvailableCollationLocales::AvailableCollationLocales() { #if USE_ICU + static const size_t MAX_LANG_LENGTH = 128; + size_t available_locales_count = ucol_countAvailable(); + for (size_t i = 0; i < available_locales_count; ++i) + { + std::string locale_name = ucol_getAvailable(i); + UChar lang_buffer[MAX_LANG_LENGTH]; + char normal_buf[MAX_LANG_LENGTH]; + UErrorCode status = U_ZERO_ERROR; + + /// All names will be in English language + size_t lang_length = uloc_getDisplayLanguage( + locale_name.c_str(), "en", lang_buffer, MAX_LANG_LENGTH, &status); + std::optional lang; + + if (!U_FAILURE(status)) + { + /// Convert language name from UChar array to normal char array. + /// We use English language for name, so all UChar's length is equal to sizeof(char) + u_UCharsToChars(lang_buffer, normal_buf, lang_length); + lang.emplace(std::string(normal_buf, lang_length)); + } + + locales_map.emplace(Poco::toLower(locale_name), LocaleAndLanguage{locale_name, lang}); + } + +#endif +} + +const AvailableCollationLocales & AvailableCollationLocales::instance() +{ + static AvailableCollationLocales instance; + return instance; +} + +AvailableCollationLocales::LocalesVector AvailableCollationLocales::getAvailableCollations() const +{ + LocalesVector result; + for (const auto & name_and_locale : locales_map) + result.push_back(name_and_locale.second); + + auto comparator = [] (const LocaleAndLanguage & f, const LocaleAndLanguage & s) + { + return f.locale_name < s.locale_name; + }; + std::sort(result.begin(), result.end(), comparator); + + return result; +} + +bool AvailableCollationLocales::isCollationSupported(const std::string & locale_name) const +{ + /// We support locale names in any case, so we have to convert all to lower case + return locales_map.count(Poco::toLower(locale_name)); +} + +Collator::Collator(const std::string & locale_) + : locale(Poco::toLower(locale_)) +{ +#if USE_ICU + /// We check it here, because ucol_open will fallback to default locale for + /// almost all random names. + if (!AvailableCollationLocales::instance().isCollationSupported(locale)) + throw DB::Exception("Unsupported collation locale: " + locale, DB::ErrorCodes::UNSUPPORTED_COLLATION_LOCALE); + UErrorCode status = U_ZERO_ERROR; collator = ucol_open(locale.c_str(), &status); - if (status != U_ZERO_ERROR) + if (U_FAILURE(status)) { ucol_close(collator); - throw DB::Exception("Unsupported collation locale: " + locale, DB::ErrorCodes::UNSUPPORTED_COLLATION_LOCALE); + throw DB::Exception("Failed to open locale: " + locale + " with error: " + u_errorName(status), DB::ErrorCodes::UNSUPPORTED_COLLATION_LOCALE); } #else throw DB::Exception("Collations support is disabled, because ClickHouse was built without ICU library", DB::ErrorCodes::SUPPORT_IS_DISABLED); @@ -60,8 +129,8 @@ int Collator::compare(const char * str1, size_t length1, const char * str2, size UErrorCode status = U_ZERO_ERROR; UCollationResult compare_result = ucol_strcollIter(collator, &iter1, &iter2, &status); - if (status != U_ZERO_ERROR) - throw DB::Exception("ICU collation comparison failed with error code: " + DB::toString(status), + if (U_FAILURE(status)) + throw DB::Exception("ICU collation comparison failed with error code: " + std::string(u_errorName(status)), DB::ErrorCodes::COLLATION_COMPARISON_FAILED); /** Values of enum UCollationResult are equals to what exactly we need: @@ -83,14 +152,3 @@ const std::string & Collator::getLocale() const { return locale; } - -std::vector Collator::getAvailableCollations() -{ - std::vector result; -#if USE_ICU - size_t available_locales_count = ucol_countAvailable(); - for (size_t i = 0; i < available_locales_count; ++i) - result.push_back(ucol_getAvailable(i)); -#endif - return result; -} diff --git a/dbms/src/Columns/Collator.h b/dbms/src/Columns/Collator.h index 0bafe6b1dba..df60aaba434 100644 --- a/dbms/src/Columns/Collator.h +++ b/dbms/src/Columns/Collator.h @@ -3,9 +3,39 @@ #include #include #include +#include + struct UCollator; +/// Class represents available locales for collations. +class AvailableCollationLocales : private boost::noncopyable +{ +public: + + struct LocaleAndLanguage + { + std::string locale_name; /// ISO locale code + std::optional language; /// full language name in English + }; + + using AvailableLocalesMap = std::unordered_map; + using LocalesVector = std::vector; + + static const AvailableCollationLocales & instance(); + + /// Get all collations with names in sorted order + LocalesVector getAvailableCollations() const; + + /// Check that collation is supported + bool isCollationSupported(const std::string & locale_name) const; + +private: + AvailableCollationLocales(); +private: + AvailableLocalesMap locales_map; +}; + class Collator : private boost::noncopyable { public: @@ -15,10 +45,8 @@ public: int compare(const char * str1, size_t length1, const char * str2, size_t length2) const; const std::string & getLocale() const; - - static std::vector getAvailableCollations(); - private: + std::string locale; UCollator * collator; }; diff --git a/dbms/src/Common/malloc.cpp b/dbms/src/Common/malloc.cpp new file mode 100644 index 00000000000..1c45fd88605 --- /dev/null +++ b/dbms/src/Common/malloc.cpp @@ -0,0 +1,40 @@ +#if defined(OS_LINUX) +#include + +/// Interposing these symbols explicitly. The idea works like this: malloc.cpp compiles to a +/// dedicated object (namely clickhouse_malloc.o), and it will show earlier in the link command +/// than malloc libs like libjemalloc.a. As a result, these symbols get picked in time right after. +extern "C" +{ + void *malloc(size_t size); + void free(void *ptr); + void *calloc(size_t nmemb, size_t size); + void *realloc(void *ptr, size_t size); + int posix_memalign(void **memptr, size_t alignment, size_t size); + void *aligned_alloc(size_t alignment, size_t size); + void *valloc(size_t size); + void *memalign(size_t alignment, size_t size); + void *pvalloc(size_t size); +} + +template +inline void ignore(T x __attribute__((unused))) +{ +} + +static void dummyFunctionForInterposing() __attribute__((used)); +static void dummyFunctionForInterposing() +{ + void* dummy; + /// Suppression for PVS-Studio. + free(nullptr); // -V575 + ignore(malloc(0)); // -V575 + ignore(calloc(0, 0)); // -V575 + ignore(realloc(nullptr, 0)); // -V575 + ignore(posix_memalign(&dummy, 0, 0)); // -V575 + ignore(aligned_alloc(0, 0)); // -V575 + ignore(valloc(0)); // -V575 + ignore(memalign(0, 0)); // -V575 + ignore(pvalloc(0)); // -V575 +} +#endif diff --git a/dbms/src/Core/MySQLProtocol.cpp b/dbms/src/Core/MySQLProtocol.cpp index 12fd6f963a1..82af8f290a1 100644 --- a/dbms/src/Core/MySQLProtocol.cpp +++ b/dbms/src/Core/MySQLProtocol.cpp @@ -143,7 +143,7 @@ ColumnDefinition getColumnDefinition(const String & column_name, const TypeIndex flags = ColumnDefinitionFlags::BINARY_FLAG; break; case TypeIndex::Float64: - column_type = ColumnType::MYSQL_TYPE_TINY; + column_type = ColumnType::MYSQL_TYPE_DOUBLE; flags = ColumnDefinitionFlags::BINARY_FLAG; break; case TypeIndex::Date: @@ -155,8 +155,6 @@ ColumnDefinition getColumnDefinition(const String & column_name, const TypeIndex flags = ColumnDefinitionFlags::BINARY_FLAG; break; case TypeIndex::String: - column_type = ColumnType::MYSQL_TYPE_STRING; - break; case TypeIndex::FixedString: column_type = ColumnType::MYSQL_TYPE_STRING; break; diff --git a/dbms/src/DataStreams/FinishSortingBlockInputStream.cpp b/dbms/src/DataStreams/FinishSortingBlockInputStream.cpp index e7382bf8b6d..b0bbf98b28e 100644 --- a/dbms/src/DataStreams/FinishSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/FinishSortingBlockInputStream.cpp @@ -52,7 +52,7 @@ struct Less { for (auto it = left_columns.begin(), jt = right_columns.begin(); it != left_columns.end(); ++it, ++jt) { - int res = it->second.direction * it->first->compareAt(a, b, *jt->first, it->second.nulls_direction); + int res = it->description.direction * it->column->compareAt(a, b, *jt->column, it->description.nulls_direction); if (res < 0) return true; else if (res > 0) diff --git a/dbms/src/DataStreams/IBlockInputStream.h b/dbms/src/DataStreams/IBlockInputStream.h index 69aadf44c09..7ca41551298 100644 --- a/dbms/src/DataStreams/IBlockInputStream.h +++ b/dbms/src/DataStreams/IBlockInputStream.h @@ -263,6 +263,11 @@ protected: */ bool checkTimeLimit(); +#ifndef NDEBUG + bool read_prefix_is_called = false; + bool read_suffix_is_called = false; +#endif + private: bool enabled_extremes = false; @@ -315,10 +320,6 @@ private: return; } -#ifndef NDEBUG - bool read_prefix_is_called = false; - bool read_suffix_is_called = false; -#endif }; } diff --git a/dbms/src/DataStreams/NativeBlockInputStream.cpp b/dbms/src/DataStreams/NativeBlockInputStream.cpp index f8742d26ad9..31c4095e07f 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockInputStream.cpp @@ -57,12 +57,19 @@ NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server } } +// also resets few vars from IBlockInputStream (I didn't want to propagate resetParser upthere) void NativeBlockInputStream::resetParser() { istr_concrete = nullptr; use_index = false; - header.clear(); - avg_value_size_hints.clear(); + +#ifndef NDEBUG + read_prefix_is_called = false; + read_suffix_is_called = false; +#endif + + is_cancelled.store(false); + is_killed.store(false); } void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint) diff --git a/dbms/src/Dictionaries/HTTPDictionarySource.cpp b/dbms/src/Dictionaries/HTTPDictionarySource.cpp index 2a8269d4047..3dfdaa3e6ca 100644 --- a/dbms/src/Dictionaries/HTTPDictionarySource.cpp +++ b/dbms/src/Dictionaries/HTTPDictionarySource.cpp @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include #include @@ -82,12 +84,9 @@ void HTTPDictionarySource::getUpdateFieldAndDate(Poco::URI & uri) auto tmp_time = update_time; update_time = std::chrono::system_clock::now(); time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1; - char buffer[80]; - struct tm * timeinfo; - timeinfo = localtime(&hr_time); - strftime(buffer, 80, "%Y-%m-%d %H:%M:%S", timeinfo); - std::string str_time(buffer); - uri.addQueryParameter(update_field, str_time); + WriteBufferFromOwnString out; + writeDateTimeText(hr_time, out); + uri.addQueryParameter(update_field, out.str()); } else { diff --git a/dbms/src/Functions/FunctionsJSON.cpp b/dbms/src/Functions/FunctionsJSON.cpp index 673a04a8353..79dea768f61 100644 --- a/dbms/src/Functions/FunctionsJSON.cpp +++ b/dbms/src/Functions/FunctionsJSON.cpp @@ -20,6 +20,7 @@ void registerFunctionsJSON(FunctionFactory & factory) factory.registerFunction>(); factory.registerFunction>(); factory.registerFunction>(); + factory.registerFunction>(); } } diff --git a/dbms/src/Functions/FunctionsJSON.h b/dbms/src/Functions/FunctionsJSON.h index 85088bed61c..bce13f6c5d4 100644 --- a/dbms/src/Functions/FunctionsJSON.h +++ b/dbms/src/Functions/FunctionsJSON.h @@ -291,6 +291,7 @@ struct NameJSONExtractString { static constexpr auto name{"JSONExtractString"}; struct NameJSONExtract { static constexpr auto name{"JSONExtract"}; }; struct NameJSONExtractKeysAndValues { static constexpr auto name{"JSONExtractKeysAndValues"}; }; struct NameJSONExtractRaw { static constexpr auto name{"JSONExtractRaw"}; }; +struct NameJSONExtractArrayRaw { static constexpr auto name{"JSONExtractArrayRaw"}; }; template @@ -1088,4 +1089,39 @@ private: } }; +template +class JSONExtractArrayRawImpl +{ +public: + static DataTypePtr getType(const char *, const ColumnsWithTypeAndName &) + { + return std::make_shared(std::make_shared()); + } + + using Iterator = typename JSONParser::Iterator; + static bool addValueToColumn(IColumn & dest, const Iterator & it) + { + if (!JSONParser::isArray(it)) + { + return false; + } + ColumnArray & col_res = assert_cast(dest); + Iterator array_it = it; + size_t size = 0; + if (JSONParser::firstArrayElement(array_it)) + { + do + { + JSONExtractRawImpl::addValueToColumn(col_res.getData(), array_it); + ++size; + } while (JSONParser::nextArrayElement(array_it)); + } + + col_res.getOffsets().push_back(col_res.getOffsets().back() + size); + return true; + } + + static constexpr size_t num_extra_arguments = 0; + static void prepare(const char *, const Block &, const ColumnNumbers &, size_t) {} +}; } diff --git a/dbms/src/Interpreters/sortBlock.cpp b/dbms/src/Interpreters/sortBlock.cpp index 2dd9d2c681e..d2401433ca0 100644 --- a/dbms/src/Interpreters/sortBlock.cpp +++ b/dbms/src/Interpreters/sortBlock.cpp @@ -1,7 +1,9 @@ #include #include +#include #include +#include #include @@ -13,16 +15,9 @@ namespace ErrorCodes extern const int BAD_COLLATION; } - -static inline bool needCollation(const IColumn * column, const SortColumnDescription & description) +static bool isCollationRequired(const SortColumnDescription & description) { - if (!description.collator) - return false; - - if (!typeid_cast(column)) /// TODO Nullable(String) - throw Exception("Collations could be specified only for String columns.", ErrorCodes::BAD_COLLATION); - - return true; + return description.collator != nullptr; } @@ -38,7 +33,7 @@ ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, c ? block.getByName(description[i].column_name).column.get() : block.safeGetByPosition(description[i].column_number).column.get(); - res.emplace_back(column, description[i]); + res.emplace_back(ColumnWithSortDescription{column, description[i], isColumnConst(*column)}); } return res; @@ -55,7 +50,11 @@ struct PartialSortingLess { for (ColumnsWithSortDescriptions::const_iterator it = columns.begin(); it != columns.end(); ++it) { - int res = it->second.direction * it->first->compareAt(a, b, *it->first, it->second.nulls_direction); + int res; + if (it->column_const) + res = 0; + else + res = it->description.direction * it->column->compareAt(a, b, *it->column, it->description.nulls_direction); if (res < 0) return true; else if (res > 0) @@ -70,22 +69,29 @@ struct PartialSortingLessWithCollation { const ColumnsWithSortDescriptions & columns; - explicit PartialSortingLessWithCollation(const ColumnsWithSortDescriptions & columns_) : columns(columns_) {} + explicit PartialSortingLessWithCollation(const ColumnsWithSortDescriptions & columns_) + : columns(columns_) + { + } bool operator() (size_t a, size_t b) const { for (ColumnsWithSortDescriptions::const_iterator it = columns.begin(); it != columns.end(); ++it) { int res; - if (needCollation(it->first, it->second)) + + if (it->column_const) { - const ColumnString & column_string = typeid_cast(*it->first); - res = column_string.compareAtWithCollation(a, b, *it->first, *it->second.collator); + res = 0; + } + else if (isCollationRequired(it->description)) + { + const ColumnString & column_string = assert_cast(*it->column); + res = column_string.compareAtWithCollation(a, b, *it->column, *it->description.collator); } else - res = it->first->compareAt(a, b, *it->first, it->second.nulls_direction); - - res *= it->second.direction; + res = it->column->compareAt(a, b, *it->column, it->description.nulls_direction); + res *= it->description.direction; if (res < 0) return true; else if (res > 0) @@ -100,27 +106,44 @@ void sortBlock(Block & block, const SortDescription & description, UInt64 limit) if (!block) return; + /// If only one column to sort by if (description.size() == 1) { + + IColumn::Permutation perm; bool reverse = description[0].direction == -1; const IColumn * column = !description[0].column_name.empty() ? block.getByName(description[0].column_name).column.get() : block.safeGetByPosition(description[0].column_number).column.get(); - IColumn::Permutation perm; - if (needCollation(column, description[0])) + bool is_column_const = false; + if (isCollationRequired(description[0])) { - const ColumnString & column_string = typeid_cast(*column); - column_string.getPermutationWithCollation(*description[0].collator, reverse, limit, perm); + /// it it's real string column, than we need sort + if (const ColumnString * column_string = checkAndGetColumn(column)) + column_string->getPermutationWithCollation(*description[0].collator, reverse, limit, perm); + else if (checkAndGetColumnConstData(column)) + is_column_const = true; + else + throw Exception("Collations could be specified only for String columns.", ErrorCodes::BAD_COLLATION); + } - else + else if (!isColumnConst(*column)) column->getPermutation(reverse, limit, description[0].nulls_direction, perm); + else + /// we don't need to do anything with const column + is_column_const = true; size_t columns = block.columns(); for (size_t i = 0; i < columns; ++i) - block.getByPosition(i).column = block.getByPosition(i).column->permute(perm, limit); + { + if (!is_column_const) + block.getByPosition(i).column = block.getByPosition(i).column->permute(perm, limit); + else if (limit != 0) // LIMIT exists + block.getByPosition(i).column = block.getByPosition(i).column->cut(0, limit); + } } else { @@ -137,10 +160,13 @@ void sortBlock(Block & block, const SortDescription & description, UInt64 limit) for (size_t i = 0, num_sort_columns = description.size(); i < num_sort_columns; ++i) { - if (needCollation(columns_with_sort_desc[i].first, description[i])) + const IColumn * column = columns_with_sort_desc[i].column; + if (isCollationRequired(description[i])) { + if (!checkAndGetColumn(column) && !checkAndGetColumnConstData(column)) + throw Exception("Collations could be specified only for String columns.", ErrorCodes::BAD_COLLATION); + need_collation = true; - break; } } @@ -165,7 +191,9 @@ void sortBlock(Block & block, const SortDescription & description, UInt64 limit) size_t columns = block.columns(); for (size_t i = 0; i < columns; ++i) + { block.getByPosition(i).column = block.getByPosition(i).column->permute(perm, limit); + } } } diff --git a/dbms/src/Interpreters/sortBlock.h b/dbms/src/Interpreters/sortBlock.h index 06754e0d388..a3b0a10b8f9 100644 --- a/dbms/src/Interpreters/sortBlock.h +++ b/dbms/src/Interpreters/sortBlock.h @@ -29,7 +29,17 @@ void stableGetPermutation(const Block & block, const SortDescription & descripti */ bool isAlreadySorted(const Block & block, const SortDescription & description); -using ColumnsWithSortDescriptions = std::vector>; +/// Column with description for sort +struct ColumnWithSortDescription +{ + const IColumn * column; + SortColumnDescription description; + + /// It means, that this column is ColumnConst + bool column_const = false; +}; + +using ColumnsWithSortDescriptions = std::vector; ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, const SortDescription & description); diff --git a/dbms/src/Parsers/ParserSystemQuery.cpp b/dbms/src/Parsers/ParserSystemQuery.cpp index 11e881beaaf..0a5bd1bf63e 100644 --- a/dbms/src/Parsers/ParserSystemQuery.cpp +++ b/dbms/src/Parsers/ParserSystemQuery.cpp @@ -2,9 +2,7 @@ #include #include #include -#include #include -#include namespace ErrorCodes @@ -19,7 +17,7 @@ namespace DB bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected) { - if (!ParserKeyword{"SYSTEM"}.ignore(pos)) + if (!ParserKeyword{"SYSTEM"}.ignore(pos, expected)) return false; using Type = ASTSystemQuery::Type; @@ -30,7 +28,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & for (int i = static_cast(Type::UNKNOWN) + 1; i < static_cast(Type::END); ++i) { Type t = static_cast(i); - if (ParserKeyword{ASTSystemQuery::typeToString(t)}.ignore(pos)) + if (ParserKeyword{ASTSystemQuery::typeToString(t)}.ignore(pos, expected)) { res->type = t; found = true; diff --git a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp index 3ce47bf9b34..b19dd4bb911 100644 --- a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -6,7 +6,6 @@ #include #include - namespace DB { KafkaBlockInputStream::KafkaBlockInputStream( @@ -66,20 +65,8 @@ Block KafkaBlockInputStream::readImpl() MutableColumns result_columns = non_virtual_header.cloneEmptyColumns(); MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); - auto read_callback = [&] - { - virtual_columns[0]->insert(buffer->currentTopic()); // "topic" - virtual_columns[1]->insert(buffer->currentKey()); // "key" - virtual_columns[2]->insert(buffer->currentOffset()); // "offset" - virtual_columns[3]->insert(buffer->currentPartition()); // "partition" - - auto timestamp = buffer->currentTimestamp(); - if (timestamp) - virtual_columns[4]->insert(std::chrono::duration_cast(timestamp->get_timestamp()).count()); // "timestamp" - }; - auto input_format = FormatFactory::instance().getInputFormat( - storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, read_callback); + storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size); InputPort port(input_format->getPort().getHeader(), input_format.get()); connect(input_format->getPort(), port); @@ -106,13 +93,17 @@ Block KafkaBlockInputStream::readImpl() case IProcessor::Status::PortFull: { auto chunk = port.pull(); - new_rows = new_rows + chunk.getNumRows(); - /// FIXME: materialize MATERIALIZED columns here. + // that was returning bad value before https://github.com/ClickHouse/ClickHouse/pull/8005 + // if will be backported should go together with #8005 + auto chunk_rows = chunk.getNumRows(); + new_rows += chunk_rows; auto columns = chunk.detachColumns(); for (size_t i = 0, s = columns.size(); i < s; ++i) + { result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size()); + } break; } case IProcessor::Status::NeedData: @@ -125,18 +116,55 @@ Block KafkaBlockInputStream::readImpl() }; size_t total_rows = 0; - while (total_rows < max_block_size) + + while (true) { + // some formats (like RowBinaryWithNamesAndTypes / CSVWithNames) + // throw an exception from readPrefix when buffer in empty + if (buffer->eof()) + break; + auto new_rows = read_kafka_message(); + + auto _topic = buffer->currentTopic(); + auto _key = buffer->currentKey(); + auto _offset = buffer->currentOffset(); + auto _partition = buffer->currentPartition(); + auto _timestamp_raw = buffer->currentTimestamp(); + auto _timestamp = _timestamp_raw ? std::chrono::duration_cast(_timestamp_raw->get_timestamp()).count() + : 0; + for (size_t i = 0; i < new_rows; ++i) + { + virtual_columns[0]->insert(_topic); + virtual_columns[1]->insert(_key); + virtual_columns[2]->insert(_offset); + virtual_columns[3]->insert(_partition); + if (_timestamp_raw) + { + virtual_columns[4]->insert(_timestamp); + } + else + { + virtual_columns[4]->insertDefault(); + } + } + total_rows = total_rows + new_rows; buffer->allowNext(); - if (!new_rows || !checkTimeLimit()) + if (!new_rows || total_rows >= max_block_size || !checkTimeLimit()) break; } if (total_rows == 0) return Block(); + /// MATERIALIZED columns can be added here, but I think + // they are not needed here: + // and it's misleading to use them here, + // as columns 'materialized' that way stays 'ephemeral' + // i.e. will not be stored anythere + // If needed any extra columns can be added using DEFAULT they can be added at MV level if needed. + auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns)); auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns)); diff --git a/dbms/src/Storages/MergeTree/MergeList.cpp b/dbms/src/Storages/MergeTree/MergeList.cpp index 3e4537ad45c..77e6ea32da2 100644 --- a/dbms/src/Storages/MergeTree/MergeList.cpp +++ b/dbms/src/Storages/MergeTree/MergeList.cpp @@ -17,6 +17,7 @@ namespace DB MergeListElement::MergeListElement(const std::string & database_, const std::string & table_, const FutureMergedMutatedPart & future_part) : database{database_}, table{table_}, partition_id{future_part.part_info.partition_id} , result_part_name{future_part.name} + , result_part_path{future_part.path} , result_data_version{future_part.part_info.getDataVersion()} , num_parts{future_part.parts.size()} , thread_number{getThreadNumber()} @@ -24,6 +25,7 @@ MergeListElement::MergeListElement(const std::string & database_, const std::str for (const auto & source_part : future_part.parts) { source_part_names.emplace_back(source_part->name); + source_part_paths.emplace_back(source_part->getFullPath()); std::shared_lock part_lock(source_part->columns_lock); @@ -54,6 +56,7 @@ MergeInfo MergeListElement::getInfo() const res.database = database; res.table = table; res.result_part_name = result_part_name; + res.result_part_path = result_part_path; res.partition_id = partition_id; res.is_mutation = is_mutation; res.elapsed = watch.elapsedSeconds(); @@ -73,6 +76,9 @@ MergeInfo MergeListElement::getInfo() const for (const auto & source_part_name : source_part_names) res.source_part_names.emplace_back(source_part_name); + for (const auto & source_part_path : source_part_paths) + res.source_part_paths.emplace_back(source_part_path); + return res; } diff --git a/dbms/src/Storages/MergeTree/MergeList.h b/dbms/src/Storages/MergeTree/MergeList.h index 0a25277a6ed..98c627db24c 100644 --- a/dbms/src/Storages/MergeTree/MergeList.h +++ b/dbms/src/Storages/MergeTree/MergeList.h @@ -28,7 +28,9 @@ struct MergeInfo std::string database; std::string table; std::string result_part_name; + std::string result_part_path; Array source_part_names; + Array source_part_paths; std::string partition_id; bool is_mutation; Float64 elapsed; @@ -55,11 +57,13 @@ struct MergeListElement : boost::noncopyable std::string partition_id; const std::string result_part_name; + const std::string result_part_path; Int64 result_data_version{}; bool is_mutation{}; UInt64 num_parts{}; Names source_part_names; + Names source_part_paths; Int64 source_data_version{}; Stopwatch watch; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 46976da03c2..b05d0744b24 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -120,6 +120,11 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_) name = part_info.getPartName(); } +void FutureMergedMutatedPart::updatePath(const MergeTreeData & storage, const ReservationPtr & reservation) +{ + path = storage.getFullPathOnDisk(reservation->getDisk()) + name + "/"; +} + MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, size_t background_pool_size_) : data(data_), background_pool_size(background_pool_size_), log(&Logger::get(data.getLogName() + " (MergerMutator)")) { diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 0a211d11f52..54c164566a8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -17,6 +17,7 @@ class MergeProgressCallback; struct FutureMergedMutatedPart { String name; + String path; MergeTreePartInfo part_info; MergeTreeData::DataPartsVector parts; @@ -29,6 +30,7 @@ struct FutureMergedMutatedPart } void assign(MergeTreeData::DataPartsVector parts_); + void updatePath(const MergeTreeData & storage, const ReservationPtr & reservation); }; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp index 749777f1279..7d0ad34396f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp @@ -581,7 +581,6 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu void MergeTreeDataPart::loadIndexGranularity() { - String full_path = getFullPath(); index_granularity_info.changeGranularityIfRequired(full_path); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h index 74c3598d658..d47411c9068 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h @@ -23,6 +23,7 @@ namespace DB struct ColumnSize; class MergeTreeData; +struct FutureMergedMutatedPart; /// Description of the data part. diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 606414a2fa6..ece86c7e0bb 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -343,7 +343,7 @@ struct CurrentlyMergingPartsTagger StorageMergeTree & storage; public: - CurrentlyMergingPartsTagger(const FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation) + CurrentlyMergingPartsTagger(FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation) : future_part(future_part_), storage(storage_) { /// Assume mutex is already locked, because this method is called from mergeTask. @@ -361,6 +361,8 @@ public: throw Exception("Not enough space for merging parts", ErrorCodes::NOT_ENOUGH_SPACE); } + future_part_.updatePath(storage, reserved_space); + for (const auto & part : future_part.parts) { if (storage.currently_merging_mutating_parts.count(part)) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 71e014ade9c..fef062c3fe6 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1016,6 +1016,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) throw Exception("Future merged part name " + backQuote(future_merged_part.name) + " differs from part name in log entry: " + backQuote(entry.new_part_name), ErrorCodes::BAD_DATA_PART_NAME); } + future_merged_part.updatePath(*this, reserved_space); MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(database_name, table_name, future_merged_part); @@ -1156,6 +1157,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM future_mutated_part.parts.push_back(source_part); future_mutated_part.part_info = new_part_info; future_mutated_part.name = entry.new_part_name; + future_mutated_part.updatePath(*this, reserved_space); MergeList::EntryPtr merge_entry = global_context.getMergeList().insert( database_name, table_name, future_mutated_part); diff --git a/dbms/src/Storages/System/StorageSystemCollations.cpp b/dbms/src/Storages/System/StorageSystemCollations.cpp index f2a7f5e8184..a870a7c7c78 100644 --- a/dbms/src/Storages/System/StorageSystemCollations.cpp +++ b/dbms/src/Storages/System/StorageSystemCollations.cpp @@ -1,5 +1,6 @@ #include #include +#include namespace DB { @@ -8,13 +9,17 @@ NamesAndTypesList StorageSystemCollations::getNamesAndTypes() { return { {"name", std::make_shared()}, + {"language", std::make_shared(std::make_shared())}, }; } void StorageSystemCollations::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const { - for (const auto & collation_name : Collator::getAvailableCollations()) - res_columns[0]->insert(collation_name); + for (const auto & [locale, lang]: AvailableCollationLocales::instance().getAvailableCollations()) + { + res_columns[0]->insert(locale); + res_columns[1]->insert(lang ? *lang : Field()); + } } } diff --git a/dbms/src/Storages/System/StorageSystemMerges.cpp b/dbms/src/Storages/System/StorageSystemMerges.cpp index 0f3b06a27de..1ff717ee9b9 100644 --- a/dbms/src/Storages/System/StorageSystemMerges.cpp +++ b/dbms/src/Storages/System/StorageSystemMerges.cpp @@ -16,6 +16,8 @@ NamesAndTypesList StorageSystemMerges::getNamesAndTypes() {"num_parts", std::make_shared()}, {"source_part_names", std::make_shared(std::make_shared())}, {"result_part_name", std::make_shared()}, + {"source_part_paths", std::make_shared(std::make_shared())}, + {"result_part_path", std::make_shared()}, {"partition_id", std::make_shared()}, {"is_mutation", std::make_shared()}, {"total_size_bytes_compressed", std::make_shared()}, @@ -45,6 +47,8 @@ void StorageSystemMerges::fillData(MutableColumns & res_columns, const Context & res_columns[i++]->insert(merge.num_parts); res_columns[i++]->insert(merge.source_part_names); res_columns[i++]->insert(merge.result_part_name); + res_columns[i++]->insert(merge.source_part_paths); + res_columns[i++]->insert(merge.result_part_path); res_columns[i++]->insert(merge.partition_id); res_columns[i++]->insert(merge.is_mutation); res_columns[i++]->insert(merge.total_size_bytes_compressed); diff --git a/dbms/tests/instructions/sanitizers.md b/dbms/tests/instructions/sanitizers.md index cfa465fcbda..c0347a32cad 100644 --- a/dbms/tests/instructions/sanitizers.md +++ b/dbms/tests/instructions/sanitizers.md @@ -6,7 +6,7 @@ Note: We use Address Sanitizer to run functional tests for every commit automati mkdir build_asan && cd build_asan ``` -Note: using clang instead of gcc is strongly recommended. +Note: using clang instead of gcc is strongly recommended. Make sure you have installed required packages (`clang`, `lld`). It may be required to specify non-standard `lld` binary using `LINKER_NAME` option (e.g. `-D LINKER_NAME=lld-8`). ``` CC=clang CXX=clang++ cmake -D SANITIZE=address .. @@ -67,5 +67,5 @@ sudo -u clickhouse UBSAN_OPTIONS='print_stacktrace=1' ./clickhouse-ubsan server # How to use Memory Sanitizer ``` -CC=clang-8 CXX=clang++-8 cmake -D ENABLE_HDFS=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_POCO_ODBC=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0 -D ENABLE_EMBEDDED_COMPILER=0 -D USE_INTERNAL_CAPNP_LIBRARY=0 -D USE_SIMDJSON=0 -DENABLE_READLINE=0 -D SANITIZE=memory .. +CC=clang-8 CXX=clang++-8 cmake -D ENABLE_HDFS=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_POCO_ODBC=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0 -D ENABLE_EMBEDDED_COMPILER=0 -D USE_INTERNAL_CAPNP_LIBRARY=0 -D USE_SIMDJSON=0 -D ENABLE_READLINE=0 -D SANITIZE=memory .. ``` diff --git a/dbms/tests/integration/test_mysql_protocol/test.py b/dbms/tests/integration/test_mysql_protocol/test.py index d1ea106a70e..159bbc310c6 100644 --- a/dbms/tests/integration/test_mysql_protocol/test.py +++ b/dbms/tests/integration/test_mysql_protocol/test.py @@ -1,11 +1,14 @@ # coding: utf-8 +import datetime +import math import os -import docker import pytest import subprocess -import pymysql.connections +import time +import docker +import pymysql.connections from docker.models.containers import Container from helpers.cluster import ClickHouseCluster @@ -243,3 +246,58 @@ def test_mysqljs_client(server_address, nodejs_container): code, (_, _) = nodejs_container.exec_run('node test.js {host} {port} user_with_empty_password 123'.format(host=server_address, port=server_port), demux=True) assert code == 1 + + +def test_types(server_address): + client = pymysql.connections.Connection(host=server_address, user='default', password='123', database='default', port=server_port) + + cursor = client.cursor(pymysql.cursors.DictCursor) + cursor.execute( + "select " + "toInt8(-pow(2, 7)) as Int8_column, " + "toUInt8(pow(2, 8) - 1) as UInt8_column, " + "toInt16(-pow(2, 15)) as Int16_column, " + "toUInt16(pow(2, 16) - 1) as UInt16_column, " + "toInt32(-pow(2, 31)) as Int32_column, " + "toUInt32(pow(2, 32) - 1) as UInt32_column, " + "toInt64('-9223372036854775808') as Int64_column, " # -2^63 + "toUInt64('18446744073709551615') as UInt64_column, " # 2^64 - 1 + "'тест' as String_column, " + "toFixedString('тест', 8) as FixedString_column, " + "toFloat32(1.5) as Float32_column, " + "toFloat64(1.5) as Float64_column, " + "toFloat32(NaN) as Float32_NaN_column, " + "-Inf as Float64_Inf_column, " + "toDate('2019-12-08') as Date_column, " + "toDate('1970-01-01') as Date_min_column, " + "toDate('1970-01-02') as Date_after_min_column, " + "toDateTime('2019-12-08 08:24:03') as DateTime_column" + ) + + result = cursor.fetchall()[0] + expected = [ + ('Int8_column', -2 ** 7), + ('UInt8_column', 2 ** 8 - 1), + ('Int16_column', -2 ** 15), + ('UInt16_column', 2 ** 16 - 1), + ('Int32_column', -2 ** 31), + ('UInt32_column', 2 ** 32 - 1), + ('Int64_column', -2 ** 63), + ('UInt64_column', 2 ** 64 - 1), + ('String_column', 'тест'), + ('FixedString_column', 'тест'), + ('Float32_column', 1.5), + ('Float64_column', 1.5), + ('Float32_NaN_column', float('nan')), + ('Float64_Inf_column', float('-inf')), + ('Date_column', datetime.date(2019, 12, 8)), + ('Date_min_column', '0000-00-00'), + ('Date_after_min_column', datetime.date(1970, 1, 2)), + ('DateTime_column', datetime.datetime(2019, 12, 8, 8, 24, 3)), + ] + + for key, value in expected: + if isinstance(value, float) and math.isnan(value): + assert math.isnan(result[key]) + else: + assert result[key] == value diff --git a/dbms/tests/integration/test_read_temporary_tables_on_failure/__init__.py b/dbms/tests/integration/test_read_temporary_tables_on_failure/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/integration/test_read_temporary_tables_on_failure/test.py b/dbms/tests/integration/test_read_temporary_tables_on_failure/test.py new file mode 100644 index 00000000000..ad1a41b8979 --- /dev/null +++ b/dbms/tests/integration/test_read_temporary_tables_on_failure/test.py @@ -0,0 +1,26 @@ +import pytest +import time + +from helpers.cluster import ClickHouseCluster +from helpers.client import QueryTimeoutExceedException, QueryRuntimeException + +cluster = ClickHouseCluster(__file__) + +node = cluster.add_instance('node') + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + + yield cluster + finally: + cluster.shutdown() + +def test_different_versions(start_cluster): + with pytest.raises(QueryTimeoutExceedException): + node.query("SELECT sleep(3)", timeout=1) + with pytest.raises(QueryRuntimeException): + node.query("SELECT 1", settings={'max_concurrent_queries_for_user': 1}) + assert node.contains_in_log('Too many simultaneous queries for user') + assert not node.contains_in_log('Unknown packet') diff --git a/dbms/tests/integration/test_system_merges/__init__.py b/dbms/tests/integration/test_system_merges/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/integration/test_system_merges/configs/config.d/cluster.xml b/dbms/tests/integration/test_system_merges/configs/config.d/cluster.xml new file mode 100644 index 00000000000..ec7c9b8e4f8 --- /dev/null +++ b/dbms/tests/integration/test_system_merges/configs/config.d/cluster.xml @@ -0,0 +1,16 @@ + + + + + + node1 + 9000 + + + node2 + 9000 + + + + + \ No newline at end of file diff --git a/dbms/tests/integration/test_system_merges/configs/logs_config.xml b/dbms/tests/integration/test_system_merges/configs/logs_config.xml new file mode 100644 index 00000000000..bdf1bbc11c1 --- /dev/null +++ b/dbms/tests/integration/test_system_merges/configs/logs_config.xml @@ -0,0 +1,17 @@ + + 3 + + trace + /var/log/clickhouse-server/log.log + /var/log/clickhouse-server/log.err.log + 1000M + 10 + /var/log/clickhouse-server/stderr.log + /var/log/clickhouse-server/stdout.log + + + system + part_log
+ 500 +
+
diff --git a/dbms/tests/integration/test_system_merges/test.py b/dbms/tests/integration/test_system_merges/test.py new file mode 100644 index 00000000000..7b638ce05c7 --- /dev/null +++ b/dbms/tests/integration/test_system_merges/test.py @@ -0,0 +1,160 @@ +import pytest +import threading +import time +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance('node1', + config_dir='configs', + main_configs=['configs/logs_config.xml'], + with_zookeeper=True, + macros={"shard": 0, "replica": 1} ) + +node2 = cluster.add_instance('node2', + config_dir='configs', + main_configs=['configs/logs_config.xml'], + with_zookeeper=True, + macros={"shard": 0, "replica": 2} ) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +def split_tsv(data): + return [ x.split("\t") for x in data.splitlines() ] + + +@pytest.mark.parametrize("replicated", [ + "", + "replicated" +]) +def test_merge_simple(started_cluster, replicated): + try: + clickhouse_path = "/var/lib/clickhouse" + name = "test_merge_simple" + nodes = [node1, node2] if replicated else [node1] + engine = "ReplicatedMergeTree('/clickhouse/test_merge_simple', '{replica}')" if replicated else "MergeTree()" + node_check = nodes[-1] + starting_block = 0 if replicated else 1 + + for node in nodes: + node.query(""" + CREATE TABLE {name} + ( + `a` Int64 + ) + ENGINE = {engine} + ORDER BY sleep(2) + """.format(engine=engine, name=name)) + + node1.query("INSERT INTO {name} VALUES (1)".format(name=name)) + node1.query("INSERT INTO {name} VALUES (2)".format(name=name)) + node1.query("INSERT INTO {name} VALUES (3)".format(name=name)) + + parts = ["all_{}_{}_0".format(x, x) for x in range(starting_block, starting_block+3)] + result_part = "all_{}_{}_1".format(starting_block, starting_block+2) + + def optimize(): + node1.query("OPTIMIZE TABLE {name}".format(name=name)) + + wait = threading.Thread(target=time.sleep, args=(5,)) + wait.start() + t = threading.Thread(target=optimize) + t.start() + + time.sleep(1) + assert split_tsv(node_check.query(""" + SELECT database, table, num_parts, source_part_names, source_part_paths, result_part_name, result_part_path, partition_id, is_mutation + FROM system.merges + WHERE table = '{name}' + """.format(name=name))) == [ + [ + "default", + name, + "3", + "['{}','{}','{}']".format(*parts), + "['{clickhouse}/data/default/{name}/{}/','{clickhouse}/data/default/{name}/{}/','{clickhouse}/data/default/{name}/{}/']".format(*parts, clickhouse=clickhouse_path, name=name), + result_part, + "{clickhouse}/data/default/{name}/{}/".format(result_part, clickhouse=clickhouse_path, name=name), + "all", + "0" + ] + ] + t.join() + wait.join() + + assert node_check.query("SELECT * FROM system.merges WHERE table = '{name}'".format(name=name)) == "" + + finally: + for node in nodes: + node.query("DROP TABLE {name}".format(name=name)) + + +@pytest.mark.parametrize("replicated", [ + "", + "replicated" +]) +def test_mutation_simple(started_cluster, replicated): + try: + clickhouse_path = "/var/lib/clickhouse" + name = "test_mutation_simple" + nodes = [node1, node2] if replicated else [node1] + engine = "ReplicatedMergeTree('/clickhouse/test_mutation_simple', '{replica}')" if replicated else "MergeTree()" + node_check = nodes[-1] + starting_block = 0 if replicated else 1 + + for node in nodes: + node.query(""" + CREATE TABLE {name} + ( + `a` Int64 + ) + ENGINE = {engine} + ORDER BY tuple() + """.format(engine=engine, name=name)) + + node1.query("INSERT INTO {name} VALUES (1)".format(name=name)) + part = "all_{}_{}_0".format(starting_block, starting_block) + result_part = "all_{}_{}_0_{}".format(starting_block, starting_block, starting_block+1) + + def alter(): + node1.query("ALTER TABLE {name} UPDATE a = 42 WHERE sleep(2) OR 1".format(name=name)) + + t = threading.Thread(target=alter) + t.start() + + time.sleep(1) + assert split_tsv(node_check.query(""" + SELECT database, table, num_parts, source_part_names, source_part_paths, result_part_name, result_part_path, partition_id, is_mutation + FROM system.merges + WHERE table = '{name}' + """.format(name=name))) == [ + [ + "default", + name, + "1", + "['{}']".format(part), + "['{clickhouse}/data/default/{name}/{}/']".format(part, clickhouse=clickhouse_path, name=name), + result_part, + "{clickhouse}/data/default/{name}/{}/".format(result_part, clickhouse=clickhouse_path, name=name), + "all", + "1" + ], + ] + t.join() + + time.sleep(1.5) + + assert node_check.query("SELECT * FROM system.merges WHERE table = '{name}'".format(name=name)) == "" + + finally: + for node in nodes: + node.query("DROP TABLE {name}".format(name=name)) diff --git a/dbms/tests/performance/collations.xml b/dbms/tests/performance/collations.xml new file mode 100644 index 00000000000..9bc48d76bce --- /dev/null +++ b/dbms/tests/performance/collations.xml @@ -0,0 +1,25 @@ + + loop + + + + 5 + 10000 + + + 100 + 60000 + + + + + + + + + test.hits + + + SELECT Title FROM test.hits ORDER BY Title DESC LIMIT 1000, 10 + SELECT Title FROM test.hits ORDER BY Title DESC COLLATE 'tr' LIMIT 1000, 10 + diff --git a/dbms/tests/performance/grear_circle_dist.xml b/dbms/tests/performance/grear_circle_dist.xml deleted file mode 100644 index bb26605bd89..00000000000 --- a/dbms/tests/performance/grear_circle_dist.xml +++ /dev/null @@ -1,15 +0,0 @@ - - once - - - - 1000 - 10000 - - - - - SELECT count() FROM system.numbers WHERE NOT ignore(greatCircleDistance((rand() % 360) * 1. - 180, (number % 150) * 1.2 - 90, (number % 360) + toFloat64(rand()) / 4294967296 - 180, (rand() % 180) * 1. - 90)) - - SELECT count() FROM system.numbers WHERE NOT ignore(greatCircleDistance(55. + toFloat64(rand()) / 4294967296, 37. + toFloat64(rand()) / 4294967296, 55. + toFloat64(rand()) / 4294967296, 37. + toFloat64(rand()) / 4294967296)) - diff --git a/dbms/tests/queries/0_stateless/00105_shard_collations.reference b/dbms/tests/queries/0_stateless/00105_shard_collations.reference index 3ff09ff2f2d..3780e7deb0c 100644 --- a/dbms/tests/queries/0_stateless/00105_shard_collations.reference +++ b/dbms/tests/queries/0_stateless/00105_shard_collations.reference @@ -1,15 +1,18 @@ +Русский (default) Ё А Я а я ё +Русский (ru) а А ё Ё я Я +Русский (ru distributed) а а А @@ -22,6 +25,7 @@ я Я Я +Türk (default) A A B @@ -132,6 +136,7 @@ z ı Ş ş +Türk (tr) a a A @@ -242,9 +247,62 @@ z z Z Z +english (default) +A +Q +Z +c +e +english (en_US) +A +c +e +Q +Z +english (en) +A +c +e +Q +Z +español (default) +F +J +z +Ñ +español (es) +F +J +Ñ +z +Український (default) +І +Б +ї +ґ +Український (uk) +Б +ґ +І +ї +Русский (ru group by) а 1 А 4 ё 3 Ё 6 я 2 Я 5 +ζ +0 +1 +0 +1 +10 +2 +3 +4 +5 +6 +7 +8 +9 diff --git a/dbms/tests/queries/0_stateless/00105_shard_collations.sql b/dbms/tests/queries/0_stateless/00105_shard_collations.sql index 174992419e2..a73c441cc19 100644 --- a/dbms/tests/queries/0_stateless/00105_shard_collations.sql +++ b/dbms/tests/queries/0_stateless/00105_shard_collations.sql @@ -1,6 +1,49 @@ +SELECT 'Русский (default)'; SELECT arrayJoin(['а', 'я', 'ё', 'А', 'Я', 'Ё']) AS x ORDER BY x; + +SELECT 'Русский (ru)'; SELECT arrayJoin(['а', 'я', 'ё', 'А', 'Я', 'Ё']) AS x ORDER BY x COLLATE 'ru'; + +SELECT 'Русский (ru distributed)'; SELECT arrayJoin(['а', 'я', 'ё', 'А', 'Я', 'Ё']) AS x FROM remote('127.0.0.{2,3}', system, one) ORDER BY x COLLATE 'ru'; + +SELECT 'Türk (default)'; SELECT arrayJoin(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'ç', 'd', 'e', 'f', 'g', 'ğ', 'h', 'ı', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'ö', 'p', 'r', 's', 'ş', 't', 'u', 'ü', 'v', 'y', 'z', 'A', 'B', 'C', 'Ç', 'D', 'E', 'F', 'G', 'Ğ', 'H', 'I', 'İ', 'J', 'K', 'L', 'M', 'N', 'O', 'Ö', 'P', 'R', 'S', 'Ş', 'T', 'U', 'Ü', 'V', 'Y', 'Z']) AS x ORDER BY x; + +SELECT 'Türk (tr)'; SELECT arrayJoin(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'ç', 'd', 'e', 'f', 'g', 'ğ', 'h', 'ı', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'ö', 'p', 'r', 's', 'ş', 't', 'u', 'ü', 'v', 'y', 'z', 'A', 'B', 'C', 'Ç', 'D', 'E', 'F', 'G', 'Ğ', 'H', 'I', 'İ', 'J', 'K', 'L', 'M', 'N', 'O', 'Ö', 'P', 'R', 'S', 'Ş', 'T', 'U', 'Ü', 'V', 'Y', 'Z']) AS x ORDER BY x COLLATE 'tr'; + +SELECT 'english (default)'; +SELECT arrayJoin(['A', 'c', 'Z', 'Q', 'e']) AS x ORDER BY x; +SELECT 'english (en_US)'; +SELECT arrayJoin(['A', 'c', 'Z', 'Q', 'e']) AS x ORDER BY x COLLATE 'en_US'; +SELECT 'english (en)'; +SELECT arrayJoin(['A', 'c', 'Z', 'Q', 'e']) AS x ORDER BY x COLLATE 'en'; + +SELECT 'español (default)'; +SELECT arrayJoin(['F', 'z', 'J', 'Ñ']) as x ORDER BY x; +SELECT 'español (es)'; +SELECT arrayJoin(['F', 'z', 'J', 'Ñ']) as x ORDER BY x COLLATE 'es'; + +SELECT 'Український (default)'; +SELECT arrayJoin(['ґ', 'ї', 'І', 'Б']) as x ORDER BY x; +SELECT 'Український (uk)'; +SELECT arrayJoin(['ґ', 'ї', 'І', 'Б']) as x ORDER BY x COLLATE 'uk'; + +SELECT 'Русский (ru group by)'; SELECT x, n FROM (SELECT ['а', 'я', 'ё', 'А', 'Я', 'Ё'] AS arr) ARRAY JOIN arr AS x, arrayEnumerate(arr) AS n ORDER BY x COLLATE 'ru', n; + +--- Const expression +SELECT 'ζ' as x ORDER BY x COLLATE 'el'; + +-- check order by const with collation +SELECT number FROM numbers(2) ORDER BY 'x' COLLATE 'el'; + +-- check const and non const columns in order +SELECT number FROM numbers(11) ORDER BY 'x', toString(number), 'y' COLLATE 'el'; + +--- Trash locales +SELECT '' as x ORDER BY x COLLATE 'qq'; --{serverError 186} +SELECT '' as x ORDER BY x COLLATE 'qwe'; --{serverError 186} +SELECT '' as x ORDER BY x COLLATE 'some_non_existing_locale'; --{serverError 186} +SELECT '' as x ORDER BY x COLLATE 'ру'; --{serverError 186} diff --git a/dbms/tests/queries/0_stateless/00918_json_functions.reference b/dbms/tests/queries/0_stateless/00918_json_functions.reference index 181da3dd3a0..32cde7bbfb4 100644 --- a/dbms/tests/queries/0_stateless/00918_json_functions.reference +++ b/dbms/tests/queries/0_stateless/00918_json_functions.reference @@ -166,3 +166,12 @@ d e u v +--JSONExtractArrayRaw-- +[] +[] +[] +['[]','[]'] +['-100','200','300'] +['1','2','3','4','5','"hello"'] +['1','2','3'] +['4','5','6'] diff --git a/dbms/tests/queries/0_stateless/00918_json_functions.sql b/dbms/tests/queries/0_stateless/00918_json_functions.sql index 4cb2445ca2a..0db9540377e 100644 --- a/dbms/tests/queries/0_stateless/00918_json_functions.sql +++ b/dbms/tests/queries/0_stateless/00918_json_functions.sql @@ -182,3 +182,12 @@ SELECT JSONExtractRaw('{"abc":"\\u263a"}', 'abc'); SELECT '--const/non-const mixed--'; SELECT JSONExtractString('["a", "b", "c", "d", "e"]', idx) FROM (SELECT arrayJoin([1,2,3,4,5]) AS idx); SELECT JSONExtractString(json, 's') FROM (SELECT arrayJoin(['{"s":"u"}', '{"s":"v"}']) AS json); + +SELECT '--JSONExtractArrayRaw--'; +SELECT JSONExtractArrayRaw(''); +SELECT JSONExtractArrayRaw('{"a": "hello", "b": "not_array"}'); +SELECT JSONExtractArrayRaw('[]'); +SELECT JSONExtractArrayRaw('[[],[]]'); +SELECT JSONExtractArrayRaw('{"a": "hello", "b": [-100, 200.0, 300]}', 'b'); +SELECT JSONExtractArrayRaw('[1,2,3,4,5,"hello"]'); +SELECT JSONExtractArrayRaw(arrayJoin(JSONExtractArrayRaw('[[1,2,3],[4,5,6]]'))); diff --git a/debian/clickhouse-server.postinst b/debian/clickhouse-server.postinst index c47a8ef4be2..4a1f4d9d387 100644 --- a/debian/clickhouse-server.postinst +++ b/debian/clickhouse-server.postinst @@ -4,6 +4,7 @@ set -e CLICKHOUSE_USER=${CLICKHOUSE_USER:=clickhouse} CLICKHOUSE_GROUP=${CLICKHOUSE_GROUP:=${CLICKHOUSE_USER}} +# Please note that we don't support paths with whitespaces. This is rather ignorant. CLICKHOUSE_CONFDIR=${CLICKHOUSE_CONFDIR:=/etc/clickhouse-server} CLICKHOUSE_DATADIR=${CLICKHOUSE_DATADIR:=/var/lib/clickhouse} CLICKHOUSE_LOGDIR=${CLICKHOUSE_LOGDIR:=/var/log/clickhouse-server} @@ -135,6 +136,8 @@ Please fix this and reinstall this package." >&2 defaultpassword="$RET" if [ -n "$defaultpassword" ]; then echo "$defaultpassword" > ${CLICKHOUSE_CONFDIR}/users.d/default-password.xml + chown ${CLICKHOUSE_USER}:${CLICKHOUSE_GROUP} ${CLICKHOUSE_CONFDIR}/users.d/default-password.xml + chmod 600 ${CLICKHOUSE_CONFDIR}/users.d/default-password.xml fi # everything went well, so now let's reset the password diff --git a/docs/en/query_language/functions/json_functions.md b/docs/en/query_language/functions/json_functions.md index 6ab942bd012..eeb41870112 100644 --- a/docs/en/query_language/functions/json_functions.md +++ b/docs/en/query_language/functions/json_functions.md @@ -206,4 +206,16 @@ Example: SELECT JSONExtractRaw('{"a": "hello", "b": [-100, 200.0, 300]}', 'b') = '[-100, 200.0, 300]' ``` +## JSONExtractArrayRaw(json[, indices_or_keys]...) + +Returns an array with elements of JSON array, each represented as unparsed string. + +If the part does not exist or isn't array, an empty array will be returned. + +Example: + +```sql +SELECT JSONExtractArrayRaw('{"a": "hello", "b": [-100, 200.0, "hello"]}', 'b') = ['-100', '200.0', '"hello"']' +``` + [Original article](https://clickhouse.yandex/docs/en/query_language/functions/json_functions/) diff --git a/docs/en/query_language/functions/other_functions.md b/docs/en/query_language/functions/other_functions.md index f6139741849..394cd78c0f3 100644 --- a/docs/en/query_language/functions/other_functions.md +++ b/docs/en/query_language/functions/other_functions.md @@ -4,8 +4,39 @@ Returns a string with the name of the host that this function was performed on. For distributed processing, this is the name of the remote server host, if the function is performed on a remote server. -## FQDN(), fullHostName() -Returns the Fully qualified domain name aka [FQDN](https://en.wikipedia.org/wiki/Fully_qualified_domain_name). +## FQDN {#fqdn} + +Returns the fully qualified domain name. + +**Syntax** + +```sql +fqdn(); +``` + +This function is case-insensitive. + +**Returned value** + +- String with the fully qualified domain name. + +Type: `String`. + +**Example** + +Query: + +```sql +SELECT FQDN(); +``` + +Result: + +```text +┌─FQDN()──────────────────────────┐ +│ clickhouse.ru-central1.internal │ +└─────────────────────────────────┘ +``` ## basename diff --git a/docs/ru/query_language/functions/json_functions.md b/docs/ru/query_language/functions/json_functions.md index 49f575f4b78..9269493473b 100644 --- a/docs/ru/query_language/functions/json_functions.md +++ b/docs/ru/query_language/functions/json_functions.md @@ -199,9 +199,9 @@ SELECT JSONExtractKeysAndValues('{"x": {"a": 5, "b": 7, "c": 11}}', 'x', 'Int8') ## JSONExtractRaw(json[, indices_or_keys]...) -Возвращает часть JSON. +Возвращает часть JSON в виде строки, содержащей неразобранную подстроку. -Если значение не существует или имеет неверный тип, то возвращается пустая строка. +Если значение не существует, то возвращается пустая строка. Пример: @@ -209,4 +209,16 @@ SELECT JSONExtractKeysAndValues('{"x": {"a": 5, "b": 7, "c": 11}}', 'x', 'Int8') SELECT JSONExtractRaw('{"a": "hello", "b": [-100, 200.0, 300]}', 'b') = '[-100, 200.0, 300]' ``` +## JSONExtractArrayRaw(json[, indices_or_keys]...) + +Возвращает массив из элементов JSON массива, каждый из которых представлен в виде строки с неразобранными подстроками из JSON. + +Если значение не существует или не является массивом, то возвращается пустой массив. + +Пример: + +```sql +SELECT JSONExtractArrayRaw('{"a": "hello", "b": [-100, 200.0, "hello"]}', 'b') = ['-100', '200.0', '"hello"']' +``` + [Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/functions/json_functions/) diff --git a/docs/ru/query_language/functions/other_functions.md b/docs/ru/query_language/functions/other_functions.md index a7e6051e541..da47839d3ca 100644 --- a/docs/ru/query_language/functions/other_functions.md +++ b/docs/ru/query_language/functions/other_functions.md @@ -4,6 +4,40 @@ Возвращает строку - имя хоста, на котором эта функция была выполнена. При распределённой обработке запроса, это будет имя хоста удалённого сервера, если функция выполняется на удалённом сервере. +## FQDN {#fqdn} + +Возвращает полное имя домена. + +**Синтаксис** + +```sql +fqdn(); +``` + +Эта функция регистронезависимая. + +**Возвращаемое значение** + +- Полное имя домена. + +Тип: `String`. + +**Пример** + +Запрос: + +```sql +SELECT FQDN(); +``` + +Ответ: + +```text +┌─FQDN()──────────────────────────┐ +│ clickhouse.ru-central1.internal │ +└─────────────────────────────────┘ +``` + ## basename Извлекает конечную часть строки после последнего слэша или бэкслэша. Функция часто используется для извлечения имени файла из пути. diff --git a/utils/CMakeLists.txt b/utils/CMakeLists.txt index b3df25d13e6..fcf56e82b52 100644 --- a/utils/CMakeLists.txt +++ b/utils/CMakeLists.txt @@ -21,7 +21,6 @@ if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS) add_subdirectory (corrector_utf8) add_subdirectory (zookeeper-cli) add_subdirectory (zookeeper-dump-tree) - add_subdirectory (zookeeper-copy-tree) add_subdirectory (zookeeper-remove-by-list) add_subdirectory (zookeeper-create-entry-to-download-part) add_subdirectory (zookeeper-adjust-block-numbers-to-parts) diff --git a/utils/zookeeper-copy-tree/CMakeLists.txt b/utils/zookeeper-copy-tree/CMakeLists.txt deleted file mode 100644 index c4dc88d700c..00000000000 --- a/utils/zookeeper-copy-tree/CMakeLists.txt +++ /dev/null @@ -1,2 +0,0 @@ -add_executable (zookeeper-copy-tree main.cpp ${SRCS}) -target_link_libraries(zookeeper-copy-tree PRIVATE clickhouse_common_zookeeper clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY}) diff --git a/utils/zookeeper-copy-tree/main.cpp b/utils/zookeeper-copy-tree/main.cpp deleted file mode 100644 index 7bc7316b4af..00000000000 --- a/utils/zookeeper-copy-tree/main.cpp +++ /dev/null @@ -1,149 +0,0 @@ -#include -#include -#include - -#include - -#include - -namespace DB -{ -namespace ErrorCodes -{ - -extern const int UNEXPECTED_NODE_IN_ZOOKEEPER; - -} -} - -int main(int argc, char ** argv) -try -{ - boost::program_options::options_description desc("Allowed options"); - desc.add_options() - ("help,h", "produce help message") - ("from", boost::program_options::value()->required(), - "addresses of source ZooKeeper instances, comma separated. Example: example01e.yandex.ru:2181") - ("from-path", boost::program_options::value()->required(), - "where to copy from") - ("to", boost::program_options::value()->required(), - "addresses of destination ZooKeeper instances, comma separated. Example: example01e.yandex.ru:2181") - ("to-path", boost::program_options::value()->required(), - "where to copy to") - ; - - boost::program_options::variables_map options; - boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); - - if (options.count("help")) - { - std::cout << "Copy a ZooKeeper tree to another cluster." << std::endl; - std::cout << "Usage: " << argv[0] << " [options]" << std::endl; - std::cout << "WARNING: it is almost useless as it is impossible to corretly copy sequential nodes" << std::endl; - std::cout << desc << std::endl; - return 1; - } - - zkutil::ZooKeeper from_zookeeper(options.at("from").as()); - zkutil::ZooKeeper to_zookeeper(options.at("to").as()); - - std::string from_path = options.at("from-path").as(); - std::string to_path = options.at("to-path").as(); - - if (to_zookeeper.exists(to_path)) - throw DB::Exception("Destination path: " + to_path + " already exists, aborting.", - DB::ErrorCodes::UNEXPECTED_NODE_IN_ZOOKEEPER); - - struct Node - { - Node( - std::string path_, - std::future get_future_, - std::future children_future_, - Node * parent_) - : path(std::move(path_)) - , get_future(std::move(get_future_)) - , children_future(std::move(children_future_)) - , parent(parent_) - { - } - - std::string path; - std::future get_future; - std::future children_future; - - Node * parent = nullptr; - std::future create_future; - bool created = false; - bool deleted = false; - bool ephemeral = false; - }; - - std::list nodes_queue; - nodes_queue.emplace_back( - from_path, from_zookeeper.asyncGet(from_path), from_zookeeper.asyncGetChildren(from_path), nullptr); - - to_zookeeper.createAncestors(to_path); - - for (auto it = nodes_queue.begin(); it != nodes_queue.end(); ++it) - { - Coordination::GetResponse get_response; - Coordination::ListResponse children_response; - try - { - get_response = it->get_future.get(); - children_response = it->children_future.get(); - } - catch (const Coordination::Exception & e) - { - if (e.code == Coordination::ZNONODE) - { - it->deleted = true; - continue; - } - throw; - } - - if (get_response.stat.ephemeralOwner) - { - it->ephemeral = true; - continue; - } - - if (it->parent && !it->parent->created) - { - it->parent->create_future.get(); - it->parent->created = true; - std::cerr << it->parent->path << " copied!" << std::endl; - } - - std::string new_path = it->path; - new_path.replace(0, from_path.length(), to_path); - it->create_future = to_zookeeper.asyncCreate(new_path, get_response.data, zkutil::CreateMode::Persistent); - get_response.data.clear(); - get_response.data.shrink_to_fit(); - - for (const auto & name : children_response.names) - { - std::string child_path = it->path == "/" ? it->path + name : it->path + '/' + name; - nodes_queue.emplace_back( - child_path, from_zookeeper.asyncGet(child_path), from_zookeeper.asyncGetChildren(child_path), - &(*it)); - } - } - - for (auto it = nodes_queue.begin(); it != nodes_queue.end(); ++it) - { - if (!it->created && !it->deleted && !it->ephemeral) - { - it->create_future.get(); - it->created = true; - std::cerr << it->path << " copied!" << std::endl; - } - } -} -catch (...) -{ - std::cerr << DB::getCurrentExceptionMessage(true) << '\n'; - throw; -}