diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4cda4eac33e..78f559c8eb8 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -920,7 +920,7 @@ jobs: - BuilderDebMsan - BuilderDebDebug runs-on: [self-hosted, style-checker] - if: always() + if: ${{ success() || failure() }} steps: - name: Set envs run: | @@ -960,7 +960,7 @@ jobs: - BuilderBinDarwinAarch64 - BuilderBinPPC64 runs-on: [self-hosted, style-checker] - if: always() + if: ${{ success() || failure() }} steps: - name: Set envs run: | diff --git a/.github/workflows/woboq.yml b/.github/workflows/woboq.yml new file mode 100644 index 00000000000..f3cd7ab6245 --- /dev/null +++ b/.github/workflows/woboq.yml @@ -0,0 +1,42 @@ +name: WoboqBuilder +env: + # Force the stdout and stderr streams to be unbuffered + PYTHONUNBUFFERED: 1 + +concurrency: + group: woboq +on: # yamllint disable-line rule:truthy + schedule: + - cron: '0 */18 * * *' + workflow_dispatch: +jobs: + # don't use dockerhub push because this image updates so rarely + WoboqCodebrowser: + runs-on: [self-hosted, style-checker] + steps: + - name: Set envs + run: | + cat >> "$GITHUB_ENV" << 'EOF' + TEMP_PATH=${{runner.temp}}/codebrowser + REPO_COPY=${{runner.temp}}/codebrowser/ClickHouse + IMAGES_PATH=${{runner.temp}}/images_path + EOF + - name: Clear repository + run: | + sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE + - name: Check out repository code + uses: actions/checkout@v2 + with: + submodules: 'true' + - name: Codebrowser + run: | + sudo rm -fr $TEMP_PATH + mkdir -p $TEMP_PATH + cp -r $GITHUB_WORKSPACE $TEMP_PATH + cd $REPO_COPY/tests/ci && python3 codebrowser_check.py + - name: Cleanup + if: always() + run: | + docker kill $(docker ps -q) ||: + docker rm -f $(docker ps -a -q) ||: + sudo rm -fr $TEMP_PATH diff --git a/PreLoad.cmake b/PreLoad.cmake index 9fba896d72e..46bf8efed31 100644 --- a/PreLoad.cmake +++ b/PreLoad.cmake @@ -27,8 +27,7 @@ execute_process(COMMAND uname -m OUTPUT_VARIABLE ARCH) if (OS MATCHES "Linux" AND NOT DEFINED CMAKE_TOOLCHAIN_FILE AND NOT DISABLE_HERMETIC_BUILD - AND ($ENV{CC} MATCHES ".*clang.*" OR CMAKE_C_COMPILER MATCHES ".*clang.*") - AND (USE_STATIC_LIBRARIES OR NOT DEFINED USE_STATIC_LIBRARIES)) + AND ($ENV{CC} MATCHES ".*clang.*" OR CMAKE_C_COMPILER MATCHES ".*clang.*")) if (ARCH MATCHES "amd64|x86_64") set (CMAKE_TOOLCHAIN_FILE "cmake/linux/toolchain-x86_64.cmake" CACHE INTERNAL "" FORCE) diff --git a/base/mysqlxx/Row.cpp b/base/mysqlxx/Row.cpp index aecec46e519..861a04f8ece 100644 --- a/base/mysqlxx/Row.cpp +++ b/base/mysqlxx/Row.cpp @@ -21,4 +21,12 @@ Value Row::operator[] (const char * name) const throw Exception(std::string("Unknown column ") + name); } +enum enum_field_types Row::getFieldType(size_t i) +{ + if (i >= res->getNumFields()) + throw Exception(std::string("Array Index Overflow")); + MYSQL_FIELDS fields = res->getFields(); + return fields[i].type; +} + } diff --git a/base/mysqlxx/Row.h b/base/mysqlxx/Row.h index d668fdbd29a..b11d7d628ef 100644 --- a/base/mysqlxx/Row.h +++ b/base/mysqlxx/Row.h @@ -79,6 +79,8 @@ public: */ operator private_bool_type() const { return row == nullptr ? nullptr : &Row::row; } + enum enum_field_types getFieldType(size_t i); + private: MYSQL_ROW row{}; ResultBase * res{}; diff --git a/base/mysqlxx/Types.h b/base/mysqlxx/Types.h index b5ed70916fa..5fd9aa8bbc8 100644 --- a/base/mysqlxx/Types.h +++ b/base/mysqlxx/Types.h @@ -16,6 +16,8 @@ using MYSQL_ROW = char**; struct st_mysql_field; using MYSQL_FIELD = st_mysql_field; +enum struct enum_field_types; + #endif namespace mysqlxx diff --git a/cmake/linux/toolchain-x86_64.cmake b/cmake/linux/toolchain-x86_64.cmake index 879f35feb83..965ea024ab7 100644 --- a/cmake/linux/toolchain-x86_64.cmake +++ b/cmake/linux/toolchain-x86_64.cmake @@ -14,9 +14,12 @@ set (TOOLCHAIN_PATH "${CMAKE_CURRENT_LIST_DIR}/../../contrib/sysroot/linux-x86_6 set (CMAKE_SYSROOT "${TOOLCHAIN_PATH}/x86_64-linux-gnu/libc") -set (CMAKE_C_FLAGS_INIT "${CMAKE_C_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") -set (CMAKE_CXX_FLAGS_INIT "${CMAKE_CXX_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") -set (CMAKE_ASM_FLAGS_INIT "${CMAKE_ASM_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") +set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") +set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") +set (CMAKE_ASM_FLAGS "${CMAKE_ASM_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") +set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") +set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") +set (CMAKE_MODULE_LINKER_FLAGS "${CMAKE_MODULE_LINKER_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") set (HAS_PRE_1970_EXITCODE "0" CACHE STRING "Result from TRY_RUN" FORCE) set (HAS_PRE_1970_EXITCODE__TRYRUN_OUTPUT "" CACHE STRING "Output from TRY_RUN" FORCE) diff --git a/contrib/cassandra b/contrib/cassandra index eb9b68dadbb..f4a31e92a25 160000 --- a/contrib/cassandra +++ b/contrib/cassandra @@ -1 +1 @@ -Subproject commit eb9b68dadbb4417a2c132ad4a1c2fa76e65e6fc1 +Subproject commit f4a31e92a25c34c02c7291ff97c7813bc83b0e09 diff --git a/contrib/sysroot b/contrib/sysroot index 410845187f5..bbcac834526 160000 --- a/contrib/sysroot +++ b/contrib/sysroot @@ -1 +1 @@ -Subproject commit 410845187f582c5e6692b53dddbe43efbb728734 +Subproject commit bbcac834526d90d1e764164b861be426891d1743 diff --git a/docker/test/codebrowser/Dockerfile b/docker/test/codebrowser/Dockerfile index 25fabca67b5..d1059b3dacc 100644 --- a/docker/test/codebrowser/Dockerfile +++ b/docker/test/codebrowser/Dockerfile @@ -6,7 +6,7 @@ FROM clickhouse/binary-builder ARG apt_archive="http://archive.ubuntu.com" RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list -RUN apt-get update && apt-get --yes --allow-unauthenticated install clang-9 libllvm9 libclang-9-dev +RUN apt-get update && apt-get --yes --allow-unauthenticated install clang-13 libllvm13 libclang-13-dev # repo versions doesn't work correctly with C++17 # also we push reports to s3, so we add index.html to subfolder urls @@ -23,12 +23,12 @@ ENV SOURCE_DIRECTORY=/repo_folder ENV BUILD_DIRECTORY=/build ENV HTML_RESULT_DIRECTORY=$BUILD_DIRECTORY/html_report ENV SHA=nosha -ENV DATA="data" +ENV DATA="https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/data" CMD mkdir -p $BUILD_DIRECTORY && cd $BUILD_DIRECTORY && \ cmake $SOURCE_DIRECTORY -DCMAKE_CXX_COMPILER=/usr/bin/clang\+\+-13 -DCMAKE_C_COMPILER=/usr/bin/clang-13 -DCMAKE_EXPORT_COMPILE_COMMANDS=ON -DENABLE_EMBEDDED_COMPILER=0 -DENABLE_S3=0 && \ mkdir -p $HTML_RESULT_DIRECTORY && \ $CODEGEN -b $BUILD_DIRECTORY -a -o $HTML_RESULT_DIRECTORY -p ClickHouse:$SOURCE_DIRECTORY:$SHA -d $DATA | ts '%Y-%m-%d %H:%M:%S' && \ cp -r $STATIC_DATA $HTML_RESULT_DIRECTORY/ &&\ - $CODEINDEX $HTML_RESULT_DIRECTORY -d $DATA | ts '%Y-%m-%d %H:%M:%S' && \ + $CODEINDEX $HTML_RESULT_DIRECTORY -d "$DATA" | ts '%Y-%m-%d %H:%M:%S' && \ mv $HTML_RESULT_DIRECTORY /test_output diff --git a/docs/en/engines/database-engines/materialized-mysql.md b/docs/en/engines/database-engines/materialized-mysql.md index 944264b68a3..b8b49634735 100644 --- a/docs/en/engines/database-engines/materialized-mysql.md +++ b/docs/en/engines/database-engines/materialized-mysql.md @@ -83,6 +83,7 @@ When working with the `MaterializedMySQL` database engine, [ReplacingMergeTree]( | VARCHAR, VAR_STRING | [String](../../sql-reference/data-types/string.md) | | BLOB | [String](../../sql-reference/data-types/string.md) | | BINARY | [FixedString](../../sql-reference/data-types/fixedstring.md) | +| BIT | [UInt64](../../sql-reference/data-types/int-uint.md) | [Nullable](../../sql-reference/data-types/nullable.md) is supported. diff --git a/docs/en/operations/external-authenticators/kerberos.md b/docs/en/operations/external-authenticators/kerberos.md index 2e2a88dc7a8..da84c1f6a89 100644 --- a/docs/en/operations/external-authenticators/kerberos.md +++ b/docs/en/operations/external-authenticators/kerberos.md @@ -14,11 +14,11 @@ To enable Kerberos, one should include `kerberos` section in `config.xml`. This #### Parameters: - `principal` - canonical service principal name that will be acquired and used when accepting security contexts. - - This parameter is optional, if omitted, the default principal will be used. + - This parameter is optional, if omitted, the default principal will be used. - `realm` - a realm, that will be used to restrict authentication to only those requests whose initiator's realm matches it. - - This parameter is optional, if omitted, no additional filtering by realm will be applied. + - This parameter is optional, if omitted, no additional filtering by realm will be applied. Example (goes into `config.xml`): @@ -75,7 +75,7 @@ In order to enable Kerberos authentication for the user, specify `kerberos` sect Parameters: - `realm` - a realm that will be used to restrict authentication to only those requests whose initiator's realm matches it. - - This parameter is optional, if omitted, no additional filtering by realm will be applied. + - This parameter is optional, if omitted, no additional filtering by realm will be applied. Example (goes into `users.xml`): diff --git a/docs/en/sql-reference/statements/create/role.md b/docs/en/sql-reference/statements/create/role.md index 4723613aeef..e0e58f7a0f6 100644 --- a/docs/en/sql-reference/statements/create/role.md +++ b/docs/en/sql-reference/statements/create/role.md @@ -31,7 +31,7 @@ CREATE ROLE accountant; GRANT SELECT ON db.* TO accountant; ``` -This sequence of queries creates the role `accountant` that has the privilege of reading data from the `accounting` database. +This sequence of queries creates the role `accountant` that has the privilege of reading data from the `db` database. Assigning the role to the user `mira`: diff --git a/src/Columns/ColumnFixedString.h b/src/Columns/ColumnFixedString.h index 95b554ea6fa..f813ef47f21 100644 --- a/src/Columns/ColumnFixedString.h +++ b/src/Columns/ColumnFixedString.h @@ -175,6 +175,11 @@ public: chars.reserve(n * size); } + void resize(size_t size) + { + chars.resize(n * size); + } + void getExtremes(Field & min, Field & max) const override; bool structureEquals(const IColumn & rhs) const override diff --git a/src/Columns/ColumnStringHelpers.h b/src/Columns/ColumnStringHelpers.h new file mode 100644 index 00000000000..851486e490a --- /dev/null +++ b/src/Columns/ColumnStringHelpers.h @@ -0,0 +1,91 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int TOO_LARGE_STRING_SIZE; +} + +namespace ColumnStringHelpers +{ + +/** Simplifies writing data to the ColumnString or ColumnFixedString via WriteBuffer. + * + * Take care of little subtle details, like padding or proper offsets. + */ +template +class WriteHelper +{ + ColumnType & col; + WriteBufferFromVector buffer; + size_t prev_row_buffer_size = 0; + + static ColumnType & resizeColumn(ColumnType & column, size_t rows) + { + if constexpr (std::is_same_v) + column.resize(rows); + else + { + column.getOffsets().reserve(rows); + /// Using coefficient 2 for initial size is arbitrary. + column.getChars().resize(rows * 2); + } + return column; + } + +public: + WriteHelper(ColumnType & col_, size_t expected_rows) + : col(resizeColumn(col_, expected_rows)) + , buffer(col.getChars()) + {} + + ~WriteHelper() = default; + + void finalize() + { + buffer.finalize(); + } + + auto & getWriteBuffer() + { + return buffer; + } + + inline void rowWritten() + { + if constexpr (std::is_same_v) + { + if (buffer.count() > prev_row_buffer_size + col.getN()) + throw Exception( + ErrorCodes::TOO_LARGE_STRING_SIZE, + "Too large string for FixedString column"); + + // Pad with zeroes on the right to maintain FixedString invariant. + const auto excess_bytes = buffer.count() % col.getN(); + const auto fill_bytes = col.getN() - excess_bytes; + writeChar(0, fill_bytes, buffer); + } + else + { + writeChar(0, buffer); + col.getOffsets().push_back(buffer.count()); + } + + prev_row_buffer_size = buffer.count(); + } +}; + +} + +} diff --git a/src/Common/Config/ConfigProcessor.cpp b/src/Common/Config/ConfigProcessor.cpp index da7405b993f..0bb25df5d4e 100644 --- a/src/Common/Config/ConfigProcessor.cpp +++ b/src/Common/Config/ConfigProcessor.cpp @@ -41,24 +41,6 @@ namespace ErrorCodes /// For cutting preprocessed path to this base static std::string main_config_path; -/// Extracts from a string the first encountered number consisting of at least two digits. -static std::string numberFromHost(const std::string & s) -{ - for (size_t i = 0; i < s.size(); ++i) - { - std::string res; - size_t j = i; - while (j < s.size() && isNumericASCII(s[j])) - res += s[j++]; - if (res.size() >= 2) - { - while (res[0] == '0') - res.erase(res.begin()); - return res; - } - } - return ""; -} bool ConfigProcessor::isPreprocessedFile(const std::string & path) { @@ -245,19 +227,6 @@ void ConfigProcessor::merge(XMLDocumentPtr config, XMLDocumentPtr with) mergeRecursive(config, config_root, with_root); } -static std::string layerFromHost() -{ - struct utsname buf; - if (uname(&buf)) - throw Poco::Exception(std::string("uname failed: ") + errnoToString(errno)); - - std::string layer = numberFromHost(buf.nodename); - if (layer.empty()) - throw Poco::Exception(std::string("no layer in host name: ") + buf.nodename); - - return layer; -} - void ConfigProcessor::doIncludesRecursive( XMLDocumentPtr config, XMLDocumentPtr include_from, @@ -288,18 +257,6 @@ void ConfigProcessor::doIncludesRecursive( if (node->nodeType() != Node::ELEMENT_NODE) return; - /// Substitute for the number extracted from the hostname only if there is an - /// empty tag without attributes in the original file. - if (node->nodeName() == "layer" - && !node->hasAttributes() - && !node->hasChildNodes() - && node->nodeValue().empty()) - { - NodePtr new_node = config->createTextNode(layerFromHost()); - node->appendChild(new_node); - return; - } - std::map attr_nodes; NamedNodeMapPtr attributes = node->attributes(); size_t substs_count = 0; diff --git a/src/Common/Config/ConfigProcessor.h b/src/Common/Config/ConfigProcessor.h index 2a92a709934..04278d72303 100644 --- a/src/Common/Config/ConfigProcessor.h +++ b/src/Common/Config/ConfigProcessor.h @@ -59,7 +59,6 @@ public: /// 4) If zk_node_cache is non-NULL, replace elements matching the "" pattern with /// "contents of the /bar ZooKeeper node". /// If has_zk_includes is non-NULL and there are such elements, set has_zk_includes to true. - /// 5) (Yandex.Metrika-specific) Substitute "" with "layer number from the hostname". XMLDocumentPtr processConfig( bool * has_zk_includes = nullptr, zkutil::ZooKeeperNodeCache * zk_node_cache = nullptr, diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 982523a3ef2..9c4f524a322 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -259,6 +259,8 @@ M(RemoteFSUnusedPrefetches, "Number of prefetches pending at buffer destruction") \ M(RemoteFSPrefetchedReads, "Number of reads from prefecthed buffer") \ M(RemoteFSUnprefetchedReads, "Number of reads from unprefetched buffer") \ + M(RemoteFSLazySeeks, "Number of lazy seeks") \ + M(RemoteFSSeeksWithReset, "Number of seeks which lead to a new connection") \ M(RemoteFSBuffers, "Number of buffers created for asynchronous reading from remote filesystem") \ \ M(ReadBufferSeekCancelConnection, "Number of seeks which lead to new connection (s3, http)") \ diff --git a/src/Core/MySQL/MySQLReplication.cpp b/src/Core/MySQL/MySQLReplication.cpp index 663d246b0d3..78554a08d07 100644 --- a/src/Core/MySQL/MySQLReplication.cpp +++ b/src/Core/MySQL/MySQLReplication.cpp @@ -230,6 +230,7 @@ namespace MySQLReplication pos += 2; break; } + case MYSQL_TYPE_BIT: case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VAR_STRING: { /// Little-Endian @@ -584,6 +585,15 @@ namespace MySQLReplication } break; } + case MYSQL_TYPE_BIT: + { + UInt32 bits = ((meta >> 8) * 8) + (meta & 0xff); + UInt32 size = (bits + 7) / 8; + UInt64 val = 0UL; + readBigEndianStrict(payload, reinterpret_cast(&val), size); + row.push_back(val); + break; + } case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VAR_STRING: { diff --git a/src/DataTypes/DataTypeDecimalBase.h b/src/DataTypes/DataTypeDecimalBase.h index c0585095eeb..dc8c99b06bc 100644 --- a/src/DataTypes/DataTypeDecimalBase.h +++ b/src/DataTypes/DataTypeDecimalBase.h @@ -59,6 +59,7 @@ class DataTypeDecimalBase : public IDataType public: using FieldType = T; using ColumnType = ColumnDecimal; + static constexpr auto type_id = TypeId; static constexpr bool is_parametric = true; diff --git a/src/DataTypes/DataTypeEnum.h b/src/DataTypes/DataTypeEnum.h index 92c72b87afa..2f607fc2aa6 100644 --- a/src/DataTypes/DataTypeEnum.h +++ b/src/DataTypes/DataTypeEnum.h @@ -38,6 +38,7 @@ class DataTypeEnum final : public IDataTypeEnum, public EnumValues public: using FieldType = Type; using ColumnType = ColumnVector; + static constexpr auto type_id = sizeof(FieldType) == 1 ? TypeIndex::Enum8 : TypeIndex::Enum16; using typename EnumValues::Values; static constexpr bool is_parametric = true; @@ -52,7 +53,7 @@ public: std::string doGetName() const override { return type_name; } const char * getFamilyName() const override; - TypeIndex getTypeId() const override { return sizeof(FieldType) == 1 ? TypeIndex::Enum8 : TypeIndex::Enum16; } + TypeIndex getTypeId() const override { return type_id; } FieldType readValue(ReadBuffer & istr) const { diff --git a/src/DataTypes/DataTypeFixedString.h b/src/DataTypes/DataTypeFixedString.h index f88d2f5337a..a53fde42b29 100644 --- a/src/DataTypes/DataTypeFixedString.h +++ b/src/DataTypes/DataTypeFixedString.h @@ -10,6 +10,8 @@ namespace DB { +class ColumnFixedString; + namespace ErrorCodes { extern const int ARGUMENT_OUT_OF_BOUND; @@ -22,7 +24,10 @@ private: size_t n; public: + using ColumnType = ColumnFixedString; + static constexpr bool is_parametric = true; + static constexpr auto type_id = TypeIndex::FixedString; DataTypeFixedString(size_t n_) : n(n_) { @@ -33,7 +38,7 @@ public: } std::string doGetName() const override; - TypeIndex getTypeId() const override { return TypeIndex::FixedString; } + TypeIndex getTypeId() const override { return type_id; } const char * getFamilyName() const override { return "FixedString"; } diff --git a/src/DataTypes/DataTypeNumberBase.h b/src/DataTypes/DataTypeNumberBase.h index 95975051600..59dc26ed13a 100644 --- a/src/DataTypes/DataTypeNumberBase.h +++ b/src/DataTypes/DataTypeNumberBase.h @@ -20,6 +20,7 @@ class DataTypeNumberBase : public IDataType public: static constexpr bool is_parametric = false; static constexpr auto family_name = TypeName; + static constexpr auto type_id = TypeId; using FieldType = T; using ColumnType = ColumnVector; diff --git a/src/DataTypes/DataTypeString.h b/src/DataTypes/DataTypeString.h index fd674505bc0..5f3bde43a13 100644 --- a/src/DataTypes/DataTypeString.h +++ b/src/DataTypes/DataTypeString.h @@ -6,10 +6,13 @@ namespace DB { +class ColumnString; + class DataTypeString final : public IDataType { public: using FieldType = String; + using ColumnType = ColumnString; static constexpr bool is_parametric = false; static constexpr auto type_id = TypeIndex::String; diff --git a/src/DataTypes/DataTypeUUID.h b/src/DataTypes/DataTypeUUID.h index 5ed7a912607..af9f1f35ca5 100644 --- a/src/DataTypes/DataTypeUUID.h +++ b/src/DataTypes/DataTypeUUID.h @@ -15,9 +15,10 @@ public: using FieldType = UUID; using ColumnType = ColumnVector; + static constexpr auto type_id = TypeIndex::UUID; const char * getFamilyName() const override { return "UUID"; } - TypeIndex getTypeId() const override { return TypeIndex::UUID; } + TypeIndex getTypeId() const override { return type_id; } Field getDefault() const override; diff --git a/src/DataTypes/DataTypesNumber.cpp b/src/DataTypes/DataTypesNumber.cpp index fef4c34d8b0..0c9a410077f 100644 --- a/src/DataTypes/DataTypesNumber.cpp +++ b/src/DataTypes/DataTypesNumber.cpp @@ -86,6 +86,7 @@ void registerDataTypeNumbers(DataTypeFactory & factory) factory.registerAlias("INT UNSIGNED", "UInt32", DataTypeFactory::CaseInsensitive); factory.registerAlias("INTEGER UNSIGNED", "UInt32", DataTypeFactory::CaseInsensitive); factory.registerAlias("BIGINT UNSIGNED", "UInt64", DataTypeFactory::CaseInsensitive); + factory.registerAlias("BIT", "UInt64", DataTypeFactory::CaseInsensitive); } } diff --git a/src/DataTypes/Serializations/SerializationLowCardinality.cpp b/src/DataTypes/Serializations/SerializationLowCardinality.cpp index 3d0256b1ed9..17a96a79af7 100644 --- a/src/DataTypes/Serializations/SerializationLowCardinality.cpp +++ b/src/DataTypes/Serializations/SerializationLowCardinality.cpp @@ -636,6 +636,9 @@ void SerializationLowCardinality::deserializeBinaryBulkWithMultipleStreams( if (!low_cardinality_state->index_type.need_global_dictionary) { + if(additional_keys == nullptr) + throw Exception("No additional keys found.", ErrorCodes::INCORRECT_DATA); + ColumnPtr keys_column = additional_keys; if (low_cardinality_state->null_map) keys_column = ColumnNullable::create(additional_keys, low_cardinality_state->null_map); @@ -662,6 +665,9 @@ void SerializationLowCardinality::deserializeBinaryBulkWithMultipleStreams( if (!maps.additional_keys_map->empty()) { + if(additional_keys == nullptr) + throw Exception("No additional keys found.", ErrorCodes::INCORRECT_DATA); + auto used_add_keys = additional_keys->index(*maps.additional_keys_map, 0); if (dictionary_type->isNullable()) diff --git a/src/DataTypes/convertMySQLDataType.cpp b/src/DataTypes/convertMySQLDataType.cpp index 1b5e20bddce..ee897de9597 100644 --- a/src/DataTypes/convertMySQLDataType.cpp +++ b/src/DataTypes/convertMySQLDataType.cpp @@ -91,6 +91,10 @@ DataTypePtr convertMySQLDataType(MultiEnum type_support, res = std::make_shared(scale); } } + else if (type_name == "bit") + { + res = std::make_shared(); + } else if (type_support.isSet(MySQLDataTypesSupport::DECIMAL) && (type_name == "numeric" || type_name == "decimal")) { if (precision <= DecimalUtils::max_precision) diff --git a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index 23fd353a5f0..c8484e6088d 100644 --- a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -21,6 +21,8 @@ namespace ProfileEvents extern const Event RemoteFSUnusedPrefetches; extern const Event RemoteFSPrefetchedReads; extern const Event RemoteFSUnprefetchedReads; + extern const Event RemoteFSLazySeeks; + extern const Event RemoteFSSeeksWithReset; extern const Event RemoteFSBuffers; } @@ -152,11 +154,16 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; Stopwatch watch; { - size = prefetch_future.get(); + auto result = prefetch_future.get(); + size = result.size; + auto offset = result.offset; + assert(offset < size); + if (size) { memory.swap(prefetch_buffer); - set(memory.data(), memory.size()); + size -= offset; + set(memory.data() + offset, size); working_buffer.resize(size); file_offset_of_buffer_end += size; } @@ -168,16 +175,23 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() else { ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads); - size = readInto(memory.data(), memory.size()).get(); + auto result = readInto(memory.data(), memory.size()).get(); + size = result.size; + auto offset = result.offset; + assert(offset < size); if (size) { - set(memory.data(), memory.size()); + size -= offset; + set(memory.data() + offset, size); working_buffer.resize(size); file_offset_of_buffer_end += size; } } + if (file_offset_of_buffer_end != impl->offset()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected equality {} == {}. It's a bug", file_offset_of_buffer_end, impl->offset()); + prefetch_future = {}; return size; } @@ -231,18 +245,22 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence pos = working_buffer.end(); - /// Note: we read in range [file_offset_of_buffer_end, read_until_position). - if (read_until_position && file_offset_of_buffer_end < *read_until_position - && static_cast(file_offset_of_buffer_end) >= getPosition() - && static_cast(file_offset_of_buffer_end) < getPosition() + static_cast(min_bytes_for_seek)) + /** + * Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer. + * Note: we read in range [file_offset_of_buffer_end, read_until_position). + */ + off_t file_offset_before_seek = impl->offset(); + if (impl->initialized() + && read_until_position && file_offset_of_buffer_end < *read_until_position + && static_cast(file_offset_of_buffer_end) > file_offset_before_seek + && static_cast(file_offset_of_buffer_end) < file_offset_before_seek + static_cast(min_bytes_for_seek)) { - /** - * Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer. - */ - bytes_to_ignore = file_offset_of_buffer_end - getPosition(); + ProfileEvents::increment(ProfileEvents::RemoteFSLazySeeks); + bytes_to_ignore = file_offset_of_buffer_end - file_offset_before_seek; } else { + ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset); impl->reset(); } diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index be63223b313..4db0c9e3c71 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -65,7 +65,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata } -size_t ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore) +ReadBufferFromRemoteFSGather::ReadResult ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore) { /** * Set `data` to current working and internal buffers. @@ -73,23 +73,24 @@ size_t ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t o */ set(data, size); - absolute_position = offset; + file_offset_of_buffer_end = offset; bytes_to_ignore = ignore; + if (bytes_to_ignore) + assert(initialized()); auto result = nextImpl(); - bytes_to_ignore = 0; if (result) - return working_buffer.size(); + return {working_buffer.size(), BufferBase::offset()}; - return 0; + return {0, 0}; } void ReadBufferFromRemoteFSGather::initialize() { /// One clickhouse file can be split into multiple files in remote fs. - auto current_buf_offset = absolute_position; + auto current_buf_offset = file_offset_of_buffer_end; for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i) { const auto & [file_path, size] = metadata.remote_fs_objects[i]; @@ -144,7 +145,6 @@ bool ReadBufferFromRemoteFSGather::nextImpl() return readImpl(); } - bool ReadBufferFromRemoteFSGather::readImpl() { swap(*current_buf); @@ -155,15 +155,26 @@ bool ReadBufferFromRemoteFSGather::readImpl() * we save how many bytes need to be ignored (new_offset - position() bytes). */ if (bytes_to_ignore) + { current_buf->ignore(bytes_to_ignore); + bytes_to_ignore = 0; + } - auto result = current_buf->next(); + bool result = current_buf->hasPendingData(); + if (result) + { + /// bytes_to_ignore already added. + file_offset_of_buffer_end += current_buf->available(); + } + else + { + result = current_buf->next(); + if (result) + file_offset_of_buffer_end += current_buf->buffer().size(); + } swap(*current_buf); - if (result) - absolute_position += working_buffer.size(); - return result; } @@ -180,7 +191,6 @@ void ReadBufferFromRemoteFSGather::reset() current_buf.reset(); } - String ReadBufferFromRemoteFSGather::getFileName() const { return canonical_path; diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.h b/src/Disks/IO/ReadBufferFromRemoteFSGather.h index 9dd5d79d883..ddd651f47a1 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.h +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.h @@ -37,10 +37,20 @@ public: void setReadUntilPosition(size_t position) override; - size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0); + struct ReadResult + { + size_t size = 0; + size_t offset = 0; + }; + + ReadResult readInto(char * data, size_t size, size_t offset, size_t ignore = 0); size_t getFileSize() const; + size_t offset() const { return file_offset_of_buffer_end; } + + bool initialized() const { return current_buf != nullptr; } + protected: virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const = 0; @@ -57,8 +67,13 @@ private: size_t current_buf_idx = 0; - size_t absolute_position = 0; + size_t file_offset_of_buffer_end = 0; + /** + * File: |___________________| + * Buffer: |~~~~~~~| + * file_offset_of_buffer_end: ^ + */ size_t bytes_to_ignore = 0; size_t read_until_position = 0; diff --git a/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp index 112124d9fd7..c21a55d68ac 100644 --- a/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp @@ -20,7 +20,7 @@ ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS( off_t ReadIndirectBufferFromRemoteFS::getPosition() { - return impl->absolute_position - available(); + return impl->file_offset_of_buffer_end - available(); } @@ -35,29 +35,29 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence) if (whence == SEEK_CUR) { /// If position within current working buffer - shift pos. - if (!working_buffer.empty() && size_t(getPosition() + offset_) < impl->absolute_position) + if (!working_buffer.empty() && size_t(getPosition() + offset_) < impl->file_offset_of_buffer_end) { pos += offset_; return getPosition(); } else { - impl->absolute_position += offset_; + impl->file_offset_of_buffer_end += offset_; } } else if (whence == SEEK_SET) { /// If position within current working buffer - shift pos. if (!working_buffer.empty() - && size_t(offset_) >= impl->absolute_position - working_buffer.size() - && size_t(offset_) < impl->absolute_position) + && size_t(offset_) >= impl->file_offset_of_buffer_end - working_buffer.size() + && size_t(offset_) < impl->file_offset_of_buffer_end) { - pos = working_buffer.end() - (impl->absolute_position - offset_); + pos = working_buffer.end() - (impl->file_offset_of_buffer_end - offset_); return getPosition(); } else { - impl->absolute_position = offset_; + impl->file_offset_of_buffer_end = offset_; } } else @@ -66,7 +66,7 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence) impl->reset(); pos = working_buffer.end(); - return impl->absolute_position; + return impl->file_offset_of_buffer_end; } diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp index 945b2d3eb7e..4be55ff3ecf 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp @@ -8,7 +8,6 @@ #include #include -#include #include #include @@ -28,7 +27,7 @@ namespace CurrentMetrics namespace DB { -size_t ThreadPoolRemoteFSReader::RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore) +ReadBufferFromRemoteFSGather::ReadResult ThreadPoolRemoteFSReader::RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore) { return reader->readInto(data, size, offset, ignore); } @@ -44,18 +43,18 @@ std::future ThreadPoolRemoteFSReader::submit(Reques { auto task = std::make_shared>([request] { - setThreadName("ThreadPoolRemoteFSRead"); + setThreadName("VFSRead"); CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; auto * remote_fs_fd = assert_cast(request.descriptor.get()); Stopwatch watch(CLOCK_MONOTONIC); - auto bytes_read = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore); + auto [bytes_read, offset] = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore); watch.stop(); ProfileEvents::increment(ProfileEvents::RemoteFSReadMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, bytes_read); - return bytes_read; + return Result{ .size = bytes_read, .offset = offset }; }); auto future = task->get_future(); diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.h b/src/Disks/IO/ThreadPoolRemoteFSReader.h index c300162e214..b2d5f11724a 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.h +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.h @@ -3,12 +3,12 @@ #include #include #include +#include #include namespace DB { -class ReadBufferFromRemoteFSGather; class ThreadPoolRemoteFSReader : public IAsynchronousReader { @@ -28,9 +28,9 @@ public: struct ThreadPoolRemoteFSReader::RemoteFSFileDescriptor : public IFileDescriptor { public: - RemoteFSFileDescriptor(std::shared_ptr reader_) : reader(reader_) {} + explicit RemoteFSFileDescriptor(std::shared_ptr reader_) : reader(reader_) {} - size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0); + ReadBufferFromRemoteFSGather::ReadResult readInto(char * data, size_t size, size_t offset, size_t ignore = 0); private: std::shared_ptr reader; diff --git a/src/Disks/S3/DiskS3.h b/src/Disks/S3/DiskS3.h index d355d785cea..18ed733ff01 100644 --- a/src/Disks/S3/DiskS3.h +++ b/src/Disks/S3/DiskS3.h @@ -168,7 +168,7 @@ private: inline static const String RESTORE_FILE_NAME = "restore"; /// Key has format: ../../r{revision}-{operation} - const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+).*"}; + const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+)$"}; /// Object contains information about schema version. inline static const String SCHEMA_VERSION_OBJECT = ".SCHEMA_VERSION"; diff --git a/src/Functions/FunctionUnixTimestamp64.h b/src/Functions/FunctionUnixTimestamp64.h index 8c507077acd..5248f524a2b 100644 --- a/src/Functions/FunctionUnixTimestamp64.h +++ b/src/Functions/FunctionUnixTimestamp64.h @@ -56,7 +56,7 @@ public: const auto & source_data = typeid_cast &>(col).getData(); - Int32 scale_diff = typeid_cast(*src.type).getScale() - target_scale; + const Int32 scale_diff = typeid_cast(*src.type).getScale() - target_scale; if (scale_diff == 0) { for (size_t i = 0; i < input_rows_count; ++i) diff --git a/src/Functions/FunctionsConversion.h b/src/Functions/FunctionsConversion.h index 9238cc81c37..ac360834c94 100644 --- a/src/Functions/FunctionsConversion.h +++ b/src/Functions/FunctionsConversion.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -34,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -850,11 +852,15 @@ struct ConvertImpl struct ConvertImplGenericToString { - static ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type) + static ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/) { + static_assert(std::is_same_v || std::is_same_v, + "Can be used only to serialize to ColumnString or ColumnFixedString"); + ColumnUInt8::MutablePtr null_map = copyNullMap(arguments[0].column); const auto & col_with_type_and_name = columnGetNested(arguments[0]); @@ -862,27 +868,25 @@ struct ConvertImplGenericToString const IColumn & col_from = *col_with_type_and_name.column; size_t size = col_from.size(); + auto col_to = result_type->createColumn(); - auto col_to = ColumnString::create(); - - ColumnString::Chars & data_to = col_to->getChars(); - ColumnString::Offsets & offsets_to = col_to->getOffsets(); - - data_to.resize(size * 2); /// Using coefficient 2 for initial size is arbitrary. - offsets_to.resize(size); - - WriteBufferFromVector write_buffer(data_to); - - FormatSettings format_settings; - auto serialization = type.getDefaultSerialization(); - for (size_t i = 0; i < size; ++i) { - serialization->serializeText(col_from, i, write_buffer, format_settings); - writeChar(0, write_buffer); - offsets_to[i] = write_buffer.count(); - } + ColumnStringHelpers::WriteHelper write_helper( + assert_cast(*col_to), + size); - write_buffer.finalize(); + auto & write_buffer = write_helper.getWriteBuffer(); + + FormatSettings format_settings; + auto serialization = type.getDefaultSerialization(); + for (size_t i = 0; i < size; ++i) + { + serialization->serializeText(col_from, i, write_buffer, format_settings); + write_helper.rowWritten(); + } + + write_helper.finalize(); + } if (result_type->isNullable() && null_map) return ColumnNullable::create(std::move(col_to), std::move(null_map)); @@ -1006,7 +1010,8 @@ inline bool tryParseImpl(DataTypeUUID::FieldType & x, ReadBuffer & else message_buf << " at begin of string"; - if (isNativeNumber(to_type)) + // Currently there are no functions toIPv{4,6}Or{Null,Zero} + if (isNativeNumber(to_type) && !(to_type.getName() == "IPv4" || to_type.getName() == "IPv6")) message_buf << ". Note: there are to" << to_type.getName() << "OrZero and to" << to_type.getName() << "OrNull functions, which returns zero/NULL instead of throwing exception."; throw Exception(message_buf.str(), ErrorCodes::CANNOT_PARSE_TEXT); @@ -1285,40 +1290,35 @@ template struct ConvertImpl, DataTypeFixedString>, ToDataType, Name, ConvertReturnNullOnErrorTag> : ConvertThroughParsing {}; -/// Generic conversion of any type from String. Used for complex types: Array and Tuple. +/// Generic conversion of any type from String. Used for complex types: Array and Tuple or types with custom serialization. +template struct ConvertImplGenericFromString { - static ColumnPtr execute(ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type) + static ColumnPtr execute(ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable *, size_t input_rows_count) { + static_assert(std::is_same_v || std::is_same_v, + "Can be used only to parse from ColumnString or ColumnFixedString"); + const IColumn & col_from = *arguments[0].column; - size_t size = col_from.size(); - const IDataType & data_type_to = *result_type; - - if (const ColumnString * col_from_string = checkAndGetColumn(&col_from)) + if (const StringColumnType * col_from_string = checkAndGetColumn(&col_from)) { auto res = data_type_to.createColumn(); IColumn & column_to = *res; - column_to.reserve(size); - - const ColumnString::Chars & chars = col_from_string->getChars(); - const IColumn::Offsets & offsets = col_from_string->getOffsets(); - - size_t current_offset = 0; + column_to.reserve(input_rows_count); FormatSettings format_settings; auto serialization = data_type_to.getDefaultSerialization(); - for (size_t i = 0; i < size; ++i) + for (size_t i = 0; i < input_rows_count; ++i) { - ReadBufferFromMemory read_buffer(&chars[current_offset], offsets[i] - current_offset - 1); + const auto & val = col_from_string->getDataAt(i); + ReadBufferFromMemory read_buffer(val.data, val.size); serialization->deserializeWholeText(column_to, read_buffer, format_settings); if (!read_buffer.eof()) throwExceptionForIncompletelyParsedValue(read_buffer, result_type); - - current_offset = offsets[i]; } return res; @@ -1767,7 +1767,7 @@ private: /// Generic conversion of any type to String. if (std::is_same_v) { - return ConvertImplGenericToString::execute(arguments, result_type); + return ConvertImplGenericToString::execute(arguments, result_type, input_rows_count); } else throw Exception("Illegal type " + arguments[0].type->getName() + " of argument of function " + getName(), @@ -2725,10 +2725,7 @@ private: /// Conversion from String through parsing. if (checkAndGetDataType(from_type_untyped.get())) { - return [] (ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable *, size_t /*input_rows_count*/) - { - return ConvertImplGenericFromString::execute(arguments, result_type); - }; + return &ConvertImplGenericFromString::execute; } else { @@ -2745,10 +2742,7 @@ private: /// Conversion from String through parsing. if (checkAndGetDataType(from_type_untyped.get())) { - return [] (ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable *, size_t /*input_rows_count*/) - { - return ConvertImplGenericFromString::execute(arguments, result_type); - }; + return &ConvertImplGenericFromString::execute; } const auto * from_type = checkAndGetDataType(from_type_untyped.get()); @@ -2816,10 +2810,7 @@ private: /// Conversion from String through parsing. if (checkAndGetDataType(from_type_untyped.get())) { - return [] (ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable *, size_t /*input_rows_count*/) - { - return ConvertImplGenericFromString::execute(arguments, result_type); - }; + return &ConvertImplGenericFromString::execute; } const auto * from_type = checkAndGetDataType(from_type_untyped.get()); @@ -3330,6 +3321,38 @@ private: return false; }; + auto make_custom_serialization_wrapper = [&](const auto & types) -> bool + { + using Types = std::decay_t; + using ToDataType = typename Types::RightType; + using FromDataType = typename Types::LeftType; + + if constexpr (WhichDataType(FromDataType::type_id).isStringOrFixedString()) + { + if (to_type->getCustomSerialization()) + { + ret = &ConvertImplGenericFromString::execute; + return true; + } + } + if constexpr (WhichDataType(ToDataType::type_id).isStringOrFixedString()) + { + if (from_type->getCustomSerialization()) + { + ret = [](ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable *, size_t input_rows_count) -> ColumnPtr + { + return ConvertImplGenericToString::execute(arguments, result_type, input_rows_count); + }; + return true; + } + } + + return false; + }; + + if (callOnTwoTypeIndexes(from_type->getTypeId(), to_type->getTypeId(), make_custom_serialization_wrapper)) + return ret; + if (callOnIndexAndDataType(to_type->getTypeId(), make_default_wrapper)) return ret; diff --git a/src/Functions/FunctionsStringArray.h b/src/Functions/FunctionsStringArray.h index 27907626971..a6e705bb1af 100644 --- a/src/Functions/FunctionsStringArray.h +++ b/src/Functions/FunctionsStringArray.h @@ -749,7 +749,7 @@ private: { ColumnsWithTypeAndName cols; cols.emplace_back(col_arr.getDataPtr(), nested_type, "tmp"); - return ConvertImplGenericToString::execute(cols, std::make_shared()); + return ConvertImplGenericToString::execute(cols, std::make_shared(), col_arr.size()); } } diff --git a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp index b2be45471c8..a27c9035c61 100644 --- a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp +++ b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp @@ -69,7 +69,8 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl() { Stopwatch watch; CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; - size = prefetch_future.get(); + auto result = prefetch_future.get(); + size = result.size; ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds()); } @@ -90,7 +91,7 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl() { /// No pending request. Do synchronous read. - auto size = readInto(memory.data(), memory.size()).get(); + auto [size, _] = readInto(memory.data(), memory.size()).get(); file_offset_of_buffer_end += size; if (size) @@ -201,4 +202,3 @@ void AsynchronousReadBufferFromFileDescriptor::rewind() } } - diff --git a/src/IO/AsynchronousReader.h b/src/IO/AsynchronousReader.h index e4a81623205..e79e72f3bec 100644 --- a/src/IO/AsynchronousReader.h +++ b/src/IO/AsynchronousReader.h @@ -49,10 +49,18 @@ public: size_t ignore = 0; }; - /// Less than requested amount of data can be returned. - /// If size is zero - the file has ended. - /// (for example, EINTR must be handled by implementation automatically) - using Result = size_t; + struct Result + { + /// size + /// Less than requested amount of data can be returned. + /// If size is zero - the file has ended. + /// (for example, EINTR must be handled by implementation automatically) + size_t size = 0; + + /// offset + /// Optional. Useful when implementation needs to do ignore(). + size_t offset = 0; + }; /// Submit request and obtain a handle. This method don't perform any waits. /// If this method did not throw, the caller must wait for the result with 'wait' method diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index 30484b14021..f01640cb95b 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -239,7 +239,8 @@ std::unique_ptr ReadBufferFromS3::initialize() } else { - req.SetRange(fmt::format("bytes={}-", offset)); + if (offset) + req.SetRange(fmt::format("bytes={}-", offset)); LOG_TEST(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, offset); } diff --git a/src/IO/SynchronousReader.cpp b/src/IO/SynchronousReader.cpp index 599299ddad4..4414da28d28 100644 --- a/src/IO/SynchronousReader.cpp +++ b/src/IO/SynchronousReader.cpp @@ -82,10 +82,9 @@ std::future SynchronousReader::submit(Request reque watch.stop(); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); - return bytes_read; + return Result{ .size = bytes_read, .offset = 0}; + }); } } - - diff --git a/src/IO/ThreadPoolReader.cpp b/src/IO/ThreadPoolReader.cpp index 32bc13ecb75..63bc8fe7c49 100644 --- a/src/IO/ThreadPoolReader.cpp +++ b/src/IO/ThreadPoolReader.cpp @@ -117,7 +117,7 @@ std::future ThreadPoolReader::submit(Request reques if (!res) { /// The file has ended. - promise.set_value(0); + promise.set_value({0, 0}); watch.stop(); ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds()); @@ -176,7 +176,7 @@ std::future ThreadPoolReader::submit(Request reques ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); - promise.set_value(bytes_read); + promise.set_value({bytes_read, 0}); return future; } } @@ -219,7 +219,7 @@ std::future ThreadPoolReader::submit(Request reques ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); - return bytes_read; + return Result{ .size = bytes_read, .offset = 0 }; }); auto future = task->get_future(); diff --git a/src/IO/WriteBuffer.h b/src/IO/WriteBuffer.h index 9ceed533855..4d7f300a504 100644 --- a/src/IO/WriteBuffer.h +++ b/src/IO/WriteBuffer.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index abf1ae5472b..f1d04e4b3a7 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -116,82 +116,62 @@ bool checkPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_q } } - /// In case of expression/function (order by 1+2 and 2*x1, greatest(1, 2)) replace - /// positions only if all literals are numbers, otherwise it is not positional. - bool positional = true; + const auto * ast_literal = typeid_cast(argument.get()); + if (!ast_literal) + return false; - /// Case when GROUP BY element is position. - if (const auto * ast_literal = typeid_cast(argument.get())) + auto which = ast_literal->value.getType(); + if (which != Field::Types::UInt64) + return false; + + auto pos = ast_literal->value.get(); + if (!pos || pos > columns.size()) + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Positional argument out of bounds: {} (exprected in range [1, {}]", + pos, columns.size()); + + const auto & column = columns[--pos]; + if (typeid_cast(column.get())) { - auto which = ast_literal->value.getType(); - if (which == Field::Types::UInt64) + argument = column->clone(); + } + else if (typeid_cast(column.get())) + { + std::function throw_if_aggregate_function = [&](ASTPtr node) { - auto pos = ast_literal->value.get(); - if (pos > 0 && pos <= columns.size()) + if (const auto * function = typeid_cast(node.get())) { - const auto & column = columns[--pos]; - if (typeid_cast(column.get())) + auto is_aggregate_function = AggregateFunctionFactory::instance().isAggregateFunctionName(function->name); + if (is_aggregate_function) { - argument = column->clone(); - } - else if (typeid_cast(column.get())) - { - std::function throw_if_aggregate_function = [&](ASTPtr node) - { - if (const auto * function = typeid_cast(node.get())) - { - auto is_aggregate_function = AggregateFunctionFactory::instance().isAggregateFunctionName(function->name); - if (is_aggregate_function) - { - throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, - "Illegal value (aggregate function) for positional argument in {}", - ASTSelectQuery::expressionToString(expression)); - } - else - { - if (function->arguments) - { - for (const auto & arg : function->arguments->children) - throw_if_aggregate_function(arg); - } - } - } - }; - - if (expression == ASTSelectQuery::Expression::GROUP_BY) - throw_if_aggregate_function(column); - - argument = column->clone(); + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Illegal value (aggregate function) for positional argument in {}", + ASTSelectQuery::expressionToString(expression)); } else { - throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, - "Illegal value for positional argument in {}", - ASTSelectQuery::expressionToString(expression)); + if (function->arguments) + { + for (const auto & arg : function->arguments->children) + throw_if_aggregate_function(arg); + } } } - else if (pos > columns.size() || !pos) - { - throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, - "Positional argument out of bounds: {} (exprected in range [1, {}]", - pos, columns.size()); - } - } - else - positional = false; - } - else if (const auto * ast_function = typeid_cast(argument.get())) - { - if (ast_function->arguments) - { - for (auto & arg : ast_function->arguments->children) - positional &= checkPositionalArguments(arg, select_query, expression); - } + }; + + if (expression == ASTSelectQuery::Expression::GROUP_BY) + throw_if_aggregate_function(column); + + argument = column->clone(); } else - positional = false; + { + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Illegal value for positional argument in {}", + ASTSelectQuery::expressionToString(expression)); + } - return positional; + return true; } void replaceForPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_query, ASTSelectQuery::Expression expression) diff --git a/src/Interpreters/MySQL/tests/gtest_create_rewritten.cpp b/src/Interpreters/MySQL/tests/gtest_create_rewritten.cpp index 5e18b0de2e0..02af07bc00c 100644 --- a/src/Interpreters/MySQL/tests/gtest_create_rewritten.cpp +++ b/src/Interpreters/MySQL/tests/gtest_create_rewritten.cpp @@ -40,7 +40,7 @@ TEST(MySQLCreateRewritten, ColumnsDataType) {"TINYINT", "Int8"}, {"SMALLINT", "Int16"}, {"MEDIUMINT", "Int32"}, {"INT", "Int32"}, {"INTEGER", "Int32"}, {"BIGINT", "Int64"}, {"FLOAT", "Float32"}, {"DOUBLE", "Float64"}, {"VARCHAR(10)", "String"}, {"CHAR(10)", "String"}, {"Date", "Date"}, {"DateTime", "DateTime"}, - {"TIMESTAMP", "DateTime"}, {"BOOLEAN", "Bool"} + {"TIMESTAMP", "DateTime"}, {"BOOLEAN", "Bool"}, {"BIT", "UInt64"} }; for (const auto & [test_type, mapped_type] : test_types) diff --git a/src/Parsers/ExpressionListParsers.cpp b/src/Parsers/ExpressionListParsers.cpp index c06f262e605..157b67a6be3 100644 --- a/src/Parsers/ExpressionListParsers.cpp +++ b/src/Parsers/ExpressionListParsers.cpp @@ -402,7 +402,7 @@ bool ParserVariableArityOperatorList::parseImpl(Pos & pos, ASTPtr & node, Expect bool ParserBetweenExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { /// For the expression (subject [NOT] BETWEEN left AND right) - /// create an AST the same as for (subject> = left AND subject <= right). + /// create an AST the same as for (subject >= left AND subject <= right). ParserKeyword s_not("NOT"); ParserKeyword s_between("BETWEEN"); diff --git a/src/Processors/Sources/MySQLSource.cpp b/src/Processors/Sources/MySQLSource.cpp index 8e9cdcfda48..b0cb62340e9 100644 --- a/src/Processors/Sources/MySQLSource.cpp +++ b/src/Processors/Sources/MySQLSource.cpp @@ -2,6 +2,7 @@ #if USE_MYSQL #include +#include #include #include #include @@ -126,7 +127,7 @@ namespace { using ValueType = ExternalResultDescription::ValueType; - void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value, size_t & read_bytes_size) + void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value, size_t & read_bytes_size, enum enum_field_types mysql_type) { switch (type) { @@ -143,9 +144,24 @@ namespace read_bytes_size += 4; break; case ValueType::vtUInt64: - assert_cast(column).insertValue(value.getUInt()); - read_bytes_size += 8; + { + //we don't have enum enum_field_types definition in mysqlxx/Types.h, so we use literal values directly here. + if (static_cast(mysql_type) == 16) + { + size_t n = value.size(); + UInt64 val = 0UL; + ReadBufferFromMemory payload(const_cast(value.data()), n); + MySQLReplication::readBigEndianStrict(payload, reinterpret_cast(&val), n); + assert_cast(column).insertValue(val); + read_bytes_size += n; + } + else + { + assert_cast(column).insertValue(value.getUInt()); + read_bytes_size += 8; + } break; + } case ValueType::vtInt8: assert_cast(column).insertValue(value.getInt()); read_bytes_size += 1; @@ -258,12 +274,12 @@ Chunk MySQLSource::generate() { ColumnNullable & column_nullable = assert_cast(*columns[index]); const auto & data_type = assert_cast(*sample.type); - insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value, read_bytes_size); + insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value, read_bytes_size, row.getFieldType(position_mapping[index])); column_nullable.getNullMapData().emplace_back(false); } else { - insertValue(*sample.type, *columns[index], description.types[index].first, value, read_bytes_size); + insertValue(*sample.type, *columns[index], description.types[index].first, value, read_bytes_size, row.getFieldType(position_mapping[index])); } } else diff --git a/src/Server/KeeperTCPHandler.cpp b/src/Server/KeeperTCPHandler.cpp index 67abd6db13a..0c5d7d93689 100644 --- a/src/Server/KeeperTCPHandler.cpp +++ b/src/Server/KeeperTCPHandler.cpp @@ -557,6 +557,8 @@ void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response std::lock_guard lock(conn_stats_mutex); conn_stats.updateLatency(elapsed); } + + operations.erase(response->xid); keeper_dispatcher->updateKeeperStatLatency(elapsed); last_op.set(std::make_unique(LastOp{ diff --git a/src/Server/KeeperTCPHandler.h b/src/Server/KeeperTCPHandler.h index fb6541d1f53..f98b269b8be 100644 --- a/src/Server/KeeperTCPHandler.h +++ b/src/Server/KeeperTCPHandler.h @@ -93,7 +93,7 @@ private: Poco::Timestamp established; - using Operations = std::map; + using Operations = std::unordered_map; Operations operations; LastOpMultiVersion last_op; diff --git a/src/Storages/MergeTree/MergeTreeReadPool.cpp b/src/Storages/MergeTree/MergeTreeReadPool.cpp index 09542c30636..c89affb5365 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -198,7 +198,9 @@ std::vector MergeTreeReadPool::fillPerPartInfo(const RangesInDataParts & for (const auto i : collections::range(0, parts.size())) { const auto & part = parts[i]; - is_part_on_remote_disk[i] = part.data_part->isStoredOnRemoteDisk(); + bool part_on_remote_disk = part.data_part->isStoredOnRemoteDisk(); + is_part_on_remote_disk[i] = part_on_remote_disk; + do_not_steal_tasks |= part_on_remote_disk; /// Read marks for every data part. size_t sum_marks = 0; diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index a9e57617aad..b594b59fdfa 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -203,6 +203,8 @@ void MergeTreeReaderCompact::readData( { const auto & [name, type] = name_and_type; + adjustUpperBound(current_task_last_mark); /// Must go before seek. + if (!isContinuousReading(from_mark, column_position)) seekToMark(from_mark, column_position); @@ -211,8 +213,6 @@ void MergeTreeReaderCompact::readData( if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != ISerialization::Substream::ArraySizes)) return nullptr; - /// For asynchronous reading from remote fs. - data_buffer->setReadUntilPosition(marks_loader.getMark(current_task_last_mark).offset_in_compressed_file); return data_buffer; }; @@ -275,6 +275,34 @@ void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index) } } +void MergeTreeReaderCompact::adjustUpperBound(size_t last_mark) +{ + auto right_offset = marks_loader.getMark(last_mark).offset_in_compressed_file; + if (!right_offset) + { + /// If already reading till the end of file. + if (last_right_offset && *last_right_offset == 0) + return; + + last_right_offset = 0; // Zero value means the end of file. + if (cached_buffer) + cached_buffer->setReadUntilEnd(); + if (non_cached_buffer) + non_cached_buffer->setReadUntilEnd(); + } + else + { + if (last_right_offset && right_offset <= last_right_offset.value()) + return; + + last_right_offset = right_offset; + if (cached_buffer) + cached_buffer->setReadUntilPosition(right_offset); + if (non_cached_buffer) + non_cached_buffer->setReadUntilPosition(right_offset); + } +} + bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_position) { if (!last_read_granule) diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.h b/src/Storages/MergeTree/MergeTreeReaderCompact.h index 350c8427eff..381b212df3c 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.h +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.h @@ -52,6 +52,9 @@ private: /// Should we read full column or only it's offsets std::vector read_only_offsets; + /// For asynchronous reading from remote fs. Same meaning as in MergeTreeReaderStream. + std::optional last_right_offset; + size_t next_mark = 0; std::optional> last_read_granule; @@ -67,6 +70,9 @@ private: MergeTreeMarksLoader & marks_loader, const ColumnPositions & column_positions, const MarkRanges & mark_ranges); + + /// For asynchronous reading from remote fs. + void adjustUpperBound(size_t last_mark); }; } diff --git a/tests/ci/cancel_workflow_lambda/Dockerfile b/tests/ci/cancel_and_rerun_workflow_lambda/Dockerfile similarity index 100% rename from tests/ci/cancel_workflow_lambda/Dockerfile rename to tests/ci/cancel_and_rerun_workflow_lambda/Dockerfile diff --git a/tests/ci/cancel_workflow_lambda/app.py b/tests/ci/cancel_and_rerun_workflow_lambda/app.py similarity index 54% rename from tests/ci/cancel_workflow_lambda/app.py rename to tests/ci/cancel_and_rerun_workflow_lambda/app.py index e475fcb931a..bd1dc394086 100644 --- a/tests/ci/cancel_workflow_lambda/app.py +++ b/tests/ci/cancel_and_rerun_workflow_lambda/app.py @@ -1,12 +1,21 @@ #!/usr/bin/env python3 +from collections import namedtuple import json import time -import jwt +import jwt import requests import boto3 +NEED_RERUN_OR_CANCELL_WORKFLOWS = { + 13241696, # PR + 15834118, # Docs + 15522500, # MasterCI + 15516108, # ReleaseCI + 15797242, # BackportPR +} + # https://docs.github.com/en/rest/reference/actions#cancel-a-workflow-run # API_URL = 'https://api.github.com/repos/ClickHouse/ClickHouse' @@ -70,19 +79,32 @@ def _exec_get_with_retry(url): raise Exception("Cannot execute GET request with retries") -def get_workflows_cancel_urls_for_pull_request(pull_request_event): +WorkflowDescription = namedtuple('WorkflowDescription', + ['run_id', 'status', 'rerun_url', 'cancel_url']) + + +def get_workflows_description_for_pull_request(pull_request_event): head_branch = pull_request_event['head']['ref'] print("PR", pull_request_event['number'], "has head ref", head_branch) workflows = _exec_get_with_retry(API_URL + f"/actions/runs?branch={head_branch}") - workflows_urls_to_cancel = set([]) + workflow_descriptions = [] for workflow in workflows['workflow_runs']: - if workflow['status'] != 'completed': - print("Workflow", workflow['url'], "not finished, going to be cancelled") - workflows_urls_to_cancel.add(workflow['cancel_url']) - else: - print("Workflow", workflow['url'], "already finished, will not try to cancel") + if workflow['workflow_id'] in NEED_RERUN_OR_CANCELL_WORKFLOWS: + workflow_descriptions.append(WorkflowDescription( + run_id=workflow['id'], + status=workflow['status'], + rerun_url=workflow['rerun_url'], + cancel_url=workflow['cancel_url'])) - return workflows_urls_to_cancel + return workflow_descriptions + +def get_workflow_description(workflow_id): + workflow = _exec_get_with_retry(API_URL + f"/actions/runs/{workflow_id}") + return WorkflowDescription( + run_id=workflow['id'], + status=workflow['status'], + rerun_url=workflow['rerun_url'], + cancel_url=workflow['cancel_url']) def _exec_post_with_retry(url, token): headers = { @@ -99,11 +121,11 @@ def _exec_post_with_retry(url, token): raise Exception("Cannot execute POST request with retry") -def cancel_workflows(urls_to_cancel, token): +def exec_workflow_url(urls_to_cancel, token): for url in urls_to_cancel: - print("Cancelling workflow using url", url) + print("Post for workflow workflow using url", url) _exec_post_with_retry(url, token) - print("Workflow cancelled") + print("Workflow post finished") def main(event): token = get_token_from_aws() @@ -117,9 +139,39 @@ def main(event): print("PR has labels", labels) if action == 'closed' or 'do not test' in labels: print("PR merged/closed or manually labeled 'do not test' will kill workflows") - workflows_to_cancel = get_workflows_cancel_urls_for_pull_request(pull_request) - print(f"Found {len(workflows_to_cancel)} workflows to cancel") - cancel_workflows(workflows_to_cancel, token) + workflow_descriptions = get_workflows_description_for_pull_request(pull_request) + urls_to_cancel = [] + for workflow_description in workflow_descriptions: + if workflow_description.status != 'completed': + urls_to_cancel.append(workflow_description.cancel_url) + print(f"Found {len(urls_to_cancel)} workflows to cancel") + exec_workflow_url(urls_to_cancel, token) + elif action == 'labeled' and 'can be tested' in labels: + print("PR marked with can be tested label, rerun workflow") + workflow_descriptions = get_workflows_description_for_pull_request(pull_request) + if not workflow_descriptions: + print("Not found any workflows") + return + + sorted_workflows = list(sorted(workflow_descriptions, key=lambda x: x.run_id)) + most_recent_workflow = sorted_workflows[-1] + print("Latest workflow", most_recent_workflow) + if most_recent_workflow.status != 'completed': + print("Latest workflow is not completed, cancelling") + exec_workflow_url([most_recent_workflow.cancel_url], token) + print("Cancelled") + + for _ in range(30): + latest_workflow_desc = get_workflow_description(most_recent_workflow.run_id) + print("Checking latest workflow", latest_workflow_desc) + if latest_workflow_desc.status in ('completed', 'cancelled'): + print("Finally latest workflow done, going to rerun") + exec_workflow_url([most_recent_workflow.rerun_url], token) + print("Rerun finished, exiting") + break + print("Still have strange status") + time.sleep(3) + else: print("Nothing to do") diff --git a/tests/ci/cancel_workflow_lambda/requirements.txt b/tests/ci/cancel_and_rerun_workflow_lambda/requirements.txt similarity index 100% rename from tests/ci/cancel_workflow_lambda/requirements.txt rename to tests/ci/cancel_and_rerun_workflow_lambda/requirements.txt diff --git a/tests/ci/codebrowser_check.py b/tests/ci/codebrowser_check.py new file mode 100644 index 00000000000..eeb06d2c684 --- /dev/null +++ b/tests/ci/codebrowser_check.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 + + +import os +import subprocess +import logging + +from github import Github + +from env_helper import IMAGES_PATH, REPO_COPY +from stopwatch import Stopwatch +from upload_result_helper import upload_results +from s3_helper import S3Helper +from get_robot_token import get_best_robot_token +from pr_info import PRInfo +from commit_status_helper import post_commit_status +from docker_pull_helper import get_image_with_version +from tee_popen import TeePopen + +NAME = "Woboq Build (actions)" + +def get_run_command(repo_path, output_path, image): + cmd = "docker run " + \ + f"--volume={repo_path}:/repo_folder " \ + f"--volume={output_path}:/test_output " \ + f"-e 'DATA=https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/data' {image}" + return cmd + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + + stopwatch = Stopwatch() + + temp_path = os.getenv("TEMP_PATH", os.path.abspath(".")) + + pr_info = PRInfo() + + gh = Github(get_best_robot_token()) + + if not os.path.exists(temp_path): + os.makedirs(temp_path) + + docker_image = get_image_with_version(IMAGES_PATH, 'clickhouse/codebrowser') + s3_helper = S3Helper('https://s3.amazonaws.com') + + result_path = os.path.join(temp_path, "result_path") + if not os.path.exists(result_path): + os.makedirs(result_path) + + run_command = get_run_command(REPO_COPY, result_path, docker_image) + + logging.info("Going to run codebrowser: %s", run_command) + + run_log_path = os.path.join(temp_path, "runlog.log") + + with TeePopen(run_command, run_log_path) as process: + retcode = process.wait() + if retcode == 0: + logging.info("Run successfully") + else: + logging.info("Run failed") + + subprocess.check_call(f"sudo chown -R ubuntu:ubuntu {temp_path}", shell=True) + + report_path = os.path.join(result_path, "html_report") + logging.info("Report path %s", report_path) + s3_path_prefix = "codebrowser" + html_urls = s3_helper.fast_parallel_upload_dir(report_path, s3_path_prefix, 'clickhouse-test-reports') + + index_html = 'HTML report' + + test_results = [(index_html, "Look at the report")] + + report_url = upload_results(s3_helper, pr_info.number, pr_info.sha, test_results, [], NAME) + + print(f"::notice ::Report url: {report_url}") + + post_commit_status(gh, pr_info.sha, NAME, "Report built", "success", report_url) diff --git a/tests/ci/pr_info.py b/tests/ci/pr_info.py index b1775f0fc6c..795fe9aaad3 100644 --- a/tests/ci/pr_info.py +++ b/tests/ci/pr_info.py @@ -33,7 +33,7 @@ def get_pr_for_commit(sha, ref): class PRInfo: - def __init__(self, github_event=None, need_orgs=False, need_changed_files=False): + def __init__(self, github_event=None, need_orgs=False, need_changed_files=False, labels_from_api=False): if not github_event: if GITHUB_EVENT_PATH: with open(GITHUB_EVENT_PATH, 'r', encoding='utf-8') as event_file: @@ -61,7 +61,12 @@ class PRInfo: self.head_ref = github_event['pull_request']['head']['ref'] self.head_name = github_event['pull_request']['head']['repo']['full_name'] - self.labels = {l['name'] for l in github_event['pull_request']['labels']} + if labels_from_api: + response = requests.get(f"https://api.github.com/repos/{GITHUB_REPOSITORY}/issues/{self.number}/labels") + self.labels = {l['name'] for l in response.json()} + else: + self.labels = {l['name'] for l in github_event['pull_request']['labels']} + self.user_login = github_event['pull_request']['user']['login'] self.user_orgs = set([]) if need_orgs: @@ -90,7 +95,12 @@ class PRInfo: f"https://api.github.com/repos/{GITHUB_REPOSITORY}/compare/{github_event['before']}...{self.sha}" else: self.number = pull_request['number'] - self.labels = {l['name'] for l in pull_request['labels']} + if labels_from_api: + response = requests.get(f"https://api.github.com/repos/{GITHUB_REPOSITORY}/issues/{self.number}/labels") + self.labels = {l['name'] for l in response.json()} + else: + self.labels = {l['name'] for l in pull_request['labels']} + self.base_ref = pull_request['base']['ref'] self.base_name = pull_request['base']['repo']['full_name'] self.head_ref = pull_request['head']['ref'] diff --git a/tests/ci/run_check.py b/tests/ci/run_check.py index 5fa6a228e46..692cda18f20 100644 --- a/tests/ci/run_check.py +++ b/tests/ci/run_check.py @@ -90,6 +90,7 @@ def pr_is_by_trusted_user(pr_user_login, pr_user_orgs): # can be skipped entirely. def should_run_checks_for_pr(pr_info): # Consider the labels and whether the user is trusted. + print("Got labels", pr_info.labels) force_labels = set(['force tests']).intersection(pr_info.labels) if force_labels: return True, "Labeled '{}'".format(', '.join(force_labels)) @@ -109,7 +110,7 @@ def should_run_checks_for_pr(pr_info): if __name__ == "__main__": logging.basicConfig(level=logging.INFO) - pr_info = PRInfo(need_orgs=True) + pr_info = PRInfo(need_orgs=True, labels_from_api=True) can_run, description = should_run_checks_for_pr(pr_info) gh = Github(get_best_robot_token()) commit = get_commit(gh, pr_info.sha) diff --git a/tests/ci/s3_helper.py b/tests/ci/s3_helper.py index 27a613f7787..753f036a8d7 100644 --- a/tests/ci/s3_helper.py +++ b/tests/ci/s3_helper.py @@ -4,6 +4,7 @@ import logging import os import re import shutil +import time from multiprocessing.dummy import Pool import boto3 @@ -83,6 +84,58 @@ class S3Helper: else: return S3Helper.copy_file_to_local(S3_BUILDS_BUCKET, file_path, s3_path) + def fast_parallel_upload_dir(self, dir_path, s3_dir_path, bucket_name): + all_files = [] + + for root, _, files in os.walk(dir_path): + for file in files: + all_files.append(os.path.join(root, file)) + + logging.info("Files found %s", len(all_files)) + + counter = 0 + t = time.time() + sum_time = 0 + def upload_task(file_path): + nonlocal counter + nonlocal t + nonlocal sum_time + try: + s3_path = file_path.replace(dir_path, s3_dir_path) + metadata = {} + if s3_path.endswith("html"): + metadata['ContentType'] = "text/html; charset=utf-8" + elif s3_path.endswith("css"): + metadata['ContentType'] = "text/css; charset=utf-8" + elif s3_path.endswith("js"): + metadata['ContentType'] = "text/javascript; charset=utf-8" + + # Retry + for i in range(5): + try: + self.client.upload_file(file_path, bucket_name, s3_path, ExtraArgs=metadata) + break + except Exception as ex: + if i == 4: + raise ex + time.sleep(0.1 * i) + + counter += 1 + if counter % 1000 == 0: + sum_time += int(time.time() - t) + print("Uploaded", counter, "-", int(time.time() - t), "s", "sum time", sum_time, "s") + t = time.time() + except Exception as ex: + logging.critical("Failed to upload file, expcetion %s", ex) + return "https://s3.amazonaws.com/{bucket}/{path}".format(bucket=bucket_name, path=s3_path) + + p = Pool(256) + + logging.basicConfig(level=logging.CRITICAL) + result = sorted(_flatten_list(p.map(upload_task, all_files))) + logging.basicConfig(level=logging.INFO) + return result + def _upload_folder_to_s3(self, folder_path, s3_folder_path, bucket_name, keep_dirs_in_s3_path, upload_symlinks): logging.info("Upload folder '%s' to bucket=%s of s3 folder '%s'", folder_path, bucket_name, s3_folder_path) if not os.path.exists(folder_path): diff --git a/tests/ci/workflow_approve_rerun_lambda/app.py b/tests/ci/workflow_approve_rerun_lambda/app.py index 0f601f7f52a..f2502f605af 100644 --- a/tests/ci/workflow_approve_rerun_lambda/app.py +++ b/tests/ci/workflow_approve_rerun_lambda/app.py @@ -23,7 +23,7 @@ SUSPICIOUS_PATTERNS = [ ] MAX_RETRY = 5 -MAX_WORKFLOW_RERUN = 5 +MAX_WORKFLOW_RERUN = 7 WorkflowDescription = namedtuple('WorkflowDescription', ['name', 'action', 'run_id', 'event', 'workflow_id', 'conclusion', 'status', 'api_url', @@ -44,6 +44,7 @@ NEED_RERUN_WORKFLOWS = { 15834118, # Docs 15522500, # MasterCI 15516108, # ReleaseCI + 15797242, # BackportPR } # Individual trusted contirbutors who are not in any trusted organization. diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 96c44a9bbf6..d440f2de0ca 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -1527,7 +1527,7 @@ class ClickHouseCluster: if os.path.exists(self.mysql_dir): shutil.rmtree(self.mysql_dir) os.makedirs(self.mysql_logs_dir) - os.chmod(self.mysql_logs_dir, stat.S_IRWXO) + os.chmod(self.mysql_logs_dir, stat.S_IRWXU | stat.S_IRWXO) subprocess_check_call(self.base_mysql_cmd + common_opts) self.up_called = True self.wait_mysql_to_start() @@ -1537,7 +1537,7 @@ class ClickHouseCluster: if os.path.exists(self.mysql8_dir): shutil.rmtree(self.mysql8_dir) os.makedirs(self.mysql8_logs_dir) - os.chmod(self.mysql8_logs_dir, stat.S_IRWXO) + os.chmod(self.mysql8_logs_dir, stat.S_IRWXU | stat.S_IRWXO) subprocess_check_call(self.base_mysql8_cmd + common_opts) self.wait_mysql8_to_start() @@ -1546,7 +1546,7 @@ class ClickHouseCluster: if os.path.exists(self.mysql_cluster_dir): shutil.rmtree(self.mysql_cluster_dir) os.makedirs(self.mysql_cluster_logs_dir) - os.chmod(self.mysql_cluster_logs_dir, stat.S_IRWXO) + os.chmod(self.mysql_cluster_logs_dir, stat.S_IRWXU | stat.S_IRWXO) subprocess_check_call(self.base_mysql_cluster_cmd + common_opts) self.up_called = True @@ -1557,7 +1557,7 @@ class ClickHouseCluster: if os.path.exists(self.postgres_dir): shutil.rmtree(self.postgres_dir) os.makedirs(self.postgres_logs_dir) - os.chmod(self.postgres_logs_dir, stat.S_IRWXO) + os.chmod(self.postgres_logs_dir, stat.S_IRWXU | stat.S_IRWXO) subprocess_check_call(self.base_postgres_cmd + common_opts) self.up_called = True @@ -1566,11 +1566,11 @@ class ClickHouseCluster: if self.with_postgres_cluster and self.base_postgres_cluster_cmd: print('Setup Postgres') os.makedirs(self.postgres2_logs_dir) - os.chmod(self.postgres2_logs_dir, stat.S_IRWXO) + os.chmod(self.postgres2_logs_dir, stat.S_IRWXU | stat.S_IRWXO) os.makedirs(self.postgres3_logs_dir) - os.chmod(self.postgres3_logs_dir, stat.S_IRWXO) + os.chmod(self.postgres3_logs_dir, stat.S_IRWXU | stat.S_IRWXO) os.makedirs(self.postgres4_logs_dir) - os.chmod(self.postgres4_logs_dir, stat.S_IRWXO) + os.chmod(self.postgres4_logs_dir, stat.S_IRWXU | stat.S_IRWXO) subprocess_check_call(self.base_postgres_cluster_cmd + common_opts) self.up_called = True self.wait_postgres_cluster_to_start() @@ -1591,7 +1591,7 @@ class ClickHouseCluster: if self.with_rabbitmq and self.base_rabbitmq_cmd: logging.debug('Setup RabbitMQ') os.makedirs(self.rabbitmq_logs_dir) - os.chmod(self.rabbitmq_logs_dir, stat.S_IRWXO) + os.chmod(self.rabbitmq_logs_dir, stat.S_IRWXU | stat.S_IRWXO) for i in range(5): subprocess_check_call(self.base_rabbitmq_cmd + common_opts + ['--renew-anon-volumes']) @@ -1604,7 +1604,7 @@ class ClickHouseCluster: if self.with_hdfs and self.base_hdfs_cmd: logging.debug('Setup HDFS') os.makedirs(self.hdfs_logs_dir) - os.chmod(self.hdfs_logs_dir, stat.S_IRWXO) + os.chmod(self.hdfs_logs_dir, stat.S_IRWXU | stat.S_IRWXO) subprocess_check_call(self.base_hdfs_cmd + common_opts) self.up_called = True self.make_hdfs_api() @@ -1613,7 +1613,7 @@ class ClickHouseCluster: if self.with_kerberized_hdfs and self.base_kerberized_hdfs_cmd: logging.debug('Setup kerberized HDFS') os.makedirs(self.hdfs_kerberized_logs_dir) - os.chmod(self.hdfs_kerberized_logs_dir, stat.S_IRWXO) + os.chmod(self.hdfs_kerberized_logs_dir, stat.S_IRWXU | stat.S_IRWXO) run_and_check(self.base_kerberized_hdfs_cmd + common_opts) self.up_called = True self.make_hdfs_api(kerberized=True) @@ -1669,7 +1669,7 @@ class ClickHouseCluster: if self.with_jdbc_bridge and self.base_jdbc_bridge_cmd: os.makedirs(self.jdbc_driver_logs_dir) - os.chmod(self.jdbc_driver_logs_dir, stat.S_IRWXO) + os.chmod(self.jdbc_driver_logs_dir, stat.S_IRWXU | stat.S_IRWXO) subprocess_check_call(self.base_jdbc_bridge_cmd + ['up', '-d']) self.up_called = True diff --git a/tests/integration/pytest.ini b/tests/integration/pytest.ini index 2238b173227..2a57ea5a229 100644 --- a/tests/integration/pytest.ini +++ b/tests/integration/pytest.ini @@ -1,7 +1,7 @@ [pytest] python_files = test*.py norecursedirs = _instances* -timeout = 1800 +timeout = 900 junit_duration_report = call junit_suite_name = integration log_level = DEBUG diff --git a/tests/integration/test_materialized_mysql_database/materialize_with_ddl.py b/tests/integration/test_materialized_mysql_database/materialize_with_ddl.py index 43fab165c53..007d672aaea 100644 --- a/tests/integration/test_materialized_mysql_database/materialize_with_ddl.py +++ b/tests/integration/test_materialized_mysql_database/materialize_with_ddl.py @@ -624,7 +624,7 @@ def err_sync_user_privs_with_materialized_mysql_database(clickhouse_node, mysql_ service_name)) assert "priv_err_db" in clickhouse_node.query("SHOW DATABASES") assert "test_table_1" not in clickhouse_node.query("SHOW TABLES FROM priv_err_db") - clickhouse_node.query("DETACH DATABASE priv_err_db") + clickhouse_node.query_with_retry("DETACH DATABASE priv_err_db") mysql_node.query("REVOKE SELECT ON priv_err_db.* FROM 'test'@'%'") time.sleep(3) @@ -743,7 +743,7 @@ def mysql_kill_sync_thread_restore_test(clickhouse_node, mysql_node, service_nam time.sleep(sleep_time) clickhouse_node.query("SELECT * FROM test_database.test_table") - clickhouse_node.query("DETACH DATABASE test_database") + clickhouse_node.query_with_retry("DETACH DATABASE test_database") clickhouse_node.query("ATTACH DATABASE test_database") check_query(clickhouse_node, "SELECT * FROM test_database.test_table ORDER BY id FORMAT TSV", '1\n2\n') @@ -784,7 +784,7 @@ def mysql_killed_while_insert(clickhouse_node, mysql_node, service_name): mysql_node.alloc_connection() - clickhouse_node.query("DETACH DATABASE kill_mysql_while_insert") + clickhouse_node.query_with_retry("DETACH DATABASE kill_mysql_while_insert") clickhouse_node.query("ATTACH DATABASE kill_mysql_while_insert") result = mysql_node.query_and_get_data("SELECT COUNT(1) FROM kill_mysql_while_insert.test") @@ -1072,3 +1072,68 @@ def table_overrides(clickhouse_node, mysql_node, service_name): check_query(clickhouse_node, "SELECT type FROM system.columns WHERE database = 'table_overrides' AND table = 't1' AND name = 'sensor_id'", "UInt64\n") clickhouse_node.query("DROP DATABASE IF EXISTS table_overrides") mysql_node.query("DROP DATABASE IF EXISTS table_overrides") + +def materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, mysql_node, service_name): + mysql_node.query("DROP DATABASE IF EXISTS test_database_datatype") + clickhouse_node.query("DROP DATABASE IF EXISTS test_database_datatype") + mysql_node.query("CREATE DATABASE test_database_datatype DEFAULT CHARACTER SET 'utf8'") + mysql_node.query(""" + CREATE TABLE test_database_datatype.t1 ( + `v1` int(10) unsigned AUTO_INCREMENT, + `v2` TINYINT, + `v3` SMALLINT, + `v4` BIGINT, + `v5` int, + `v6` TINYINT unsigned, + `v7` SMALLINT unsigned, + `v8` BIGINT unsigned, + `v9` FLOAT, + `v10` FLOAT unsigned, + `v11` DOUBLE, + `v12` DOUBLE unsigned, + `v13` DECIMAL(5,4), + `v14` date, + `v15` TEXT, + `v16` varchar(100) , + `v17` BLOB, + `v18` datetime DEFAULT CURRENT_TIMESTAMP, + `v19` datetime(6) DEFAULT CURRENT_TIMESTAMP(6), + `v20` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + `v21` TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6), + /* todo support */ + # `v22` YEAR, + # `v23` TIME, + # `v24` TIME(3), + # `v25` GEOMETRY, + `v26` bit(4), + # `v27` JSON DEFAULT NULL, + # `v28` set('a', 'c', 'f', 'd', 'e', 'b'), + `v29` mediumint(4) unsigned NOT NULL DEFAULT '0', + `v30` varbinary(255) DEFAULT NULL COMMENT 'varbinary support', + `v31` binary(200) DEFAULT NULL, + `v32` ENUM('RED','GREEN','BLUE'), + PRIMARY KEY (`v1`) + ) ENGINE=InnoDB; + """) + + mysql_node.query(""" + INSERT INTO test_database_datatype.t1 (v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v31, v32) values + (1, 11, 9223372036854775807, -1, 1, 11, 18446744073709551615, -1.1, 1.1, -1.111, 1.111, 1.1111, '2021-10-06', 'text', 'varchar', 'BLOB', '2021-10-06 18:32:57', '2021-10-06 18:32:57.482786', '2021-10-06 18:32:57', '2021-10-06 18:32:57.482786', b'1010', 11, 'varbinary', 'binary', 'RED'); + """) + clickhouse_node.query( + "CREATE DATABASE test_database_datatype ENGINE = MaterializeMySQL('{}:3306', 'test_database_datatype', 'root', 'clickhouse')".format( + service_name)) + + check_query(clickhouse_node, "SELECT name FROM system.tables WHERE database = 'test_database_datatype'", "t1\n") + # full synchronization check + check_query(clickhouse_node, "SELECT v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v32 FROM test_database_datatype.t1 FORMAT TSV", + "1\t1\t11\t9223372036854775807\t-1\t1\t11\t18446744073709551615\t-1.1\t1.1\t-1.111\t1.111\t1.1111\t2021-10-06\ttext\tvarchar\tBLOB\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t10\t11\tvarbinary\tRED\n") + + mysql_node.query(""" + INSERT INTO test_database_datatype.t1 (v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v31, v32) values + (2, 22, 9223372036854775807, -2, 2, 22, 18446744073709551615, -2.2, 2.2, -2.22, 2.222, 2.2222, '2021-10-07', 'text', 'varchar', 'BLOB', '2021-10-07 18:32:57', '2021-10-07 18:32:57.482786', '2021-10-07 18:32:57', '2021-10-07 18:32:57.482786', b'1011', 22, 'varbinary', 'binary', 'GREEN' ); + """) + # increment synchronization check + check_query(clickhouse_node, "SELECT v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v32 FROM test_database_datatype.t1 ORDER BY v1 FORMAT TSV", + "1\t1\t11\t9223372036854775807\t-1\t1\t11\t18446744073709551615\t-1.1\t1.1\t-1.111\t1.111\t1.1111\t2021-10-06\ttext\tvarchar\tBLOB\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t10\t11\tvarbinary\tRED\n" + + "2\t2\t22\t9223372036854775807\t-2\t2\t22\t18446744073709551615\t-2.2\t2.2\t-2.22\t2.222\t2.2222\t2021-10-07\ttext\tvarchar\tBLOB\t2021-10-07 18:32:57\t2021-10-07 18:32:57.482786\t2021-10-07 18:32:57\t2021-10-07 18:32:57.482786\t11\t22\tvarbinary\tGREEN\n") diff --git a/tests/integration/test_materialized_mysql_database/test.py b/tests/integration/test_materialized_mysql_database/test.py index 5142a613799..501c0cd78fa 100644 --- a/tests/integration/test_materialized_mysql_database/test.py +++ b/tests/integration/test_materialized_mysql_database/test.py @@ -253,3 +253,7 @@ def test_table_table(started_cluster, started_mysql_8_0, started_mysql_5_7, clic def test_table_overrides(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node): materialize_with_ddl.table_overrides(clickhouse_node, started_mysql_5_7, "mysql57") materialize_with_ddl.table_overrides(clickhouse_node, started_mysql_8_0, "mysql80") + +def test_materialized_database_support_all_kinds_of_mysql_datatype(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node): + materialize_with_ddl.materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, started_mysql_8_0, "mysql80") + materialize_with_ddl.materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, started_mysql_5_7, "mysql57") diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index 1e607e94119..04981523432 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -456,3 +456,16 @@ def test_s3_disk_reads_on_unstable_connection(cluster, node_name): for i in range(30): print(f"Read sequence {i}") assert node.query("SELECT sum(id) FROM s3_test").splitlines() == ["40499995500000"] + + +@pytest.mark.parametrize("node_name", ["node"]) +def test_lazy_seek_optimization_for_async_read(cluster, node_name): + node = cluster.instances[node_name] + node.query("DROP TABLE IF EXISTS s3_test NO DELAY") + node.query("CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3';") + node.query("INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 10000000") + node.query("SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value LIMIT 10") + node.query("DROP TABLE IF EXISTS s3_test NO DELAY") + minio = cluster.minio_client + for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')): + minio.remove_object(cluster.minio_bucket, obj.object_name) diff --git a/tests/integration/test_merge_tree_s3_failover/test.py b/tests/integration/test_merge_tree_s3_failover/test.py index b6b47417523..44e7e0ae5ad 100644 --- a/tests/integration/test_merge_tree_s3_failover/test.py +++ b/tests/integration/test_merge_tree_s3_failover/test.py @@ -37,7 +37,6 @@ def fail_request(cluster, request): ["curl", "-s", "http://resolver:8080/fail_request/{}".format(request)]) assert response == 'OK', 'Expected "OK", but got "{}"'.format(response) - def throttle_request(cluster, request): response = cluster.exec_in_container(cluster.get_container_id('resolver'), ["curl", "-s", "http://resolver:8080/throttle_request/{}".format(request)]) diff --git a/tests/integration/test_merge_tree_s3_restore/test.py b/tests/integration/test_merge_tree_s3_restore/test.py index babbea2beba..e12b69cdf17 100644 --- a/tests/integration/test_merge_tree_s3_restore/test.py +++ b/tests/integration/test_merge_tree_s3_restore/test.py @@ -7,6 +7,7 @@ import time import pytest from helpers.cluster import ClickHouseCluster, get_instances_dir + SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) NOT_RESTORABLE_CONFIG_PATH = os.path.join(SCRIPT_DIR, './{}/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml'.format(get_instances_dir())) COMMON_CONFIGS = ["configs/config.d/bg_processing_pool_conf.xml", "configs/config.d/clusters.xml"] diff --git a/tests/integration/test_merge_tree_s3_with_cache/test.py b/tests/integration/test_merge_tree_s3_with_cache/test.py index e15eaf61812..be3d2709873 100644 --- a/tests/integration/test_merge_tree_s3_with_cache/test.py +++ b/tests/integration/test_merge_tree_s3_with_cache/test.py @@ -36,7 +36,6 @@ def get_query_stat(instance, hint): result[ev[0]] = int(ev[1]) return result - @pytest.mark.parametrize("min_rows_for_wide_part,read_requests", [(0, 2), (8192, 1)]) def test_write_is_cached(cluster, min_rows_for_wide_part, read_requests): node = cluster.instances["node"] diff --git a/tests/integration/test_replicated_merge_tree_s3_zero_copy/test.py b/tests/integration/test_replicated_merge_tree_s3_zero_copy/test.py index 793abc53566..edf39969b47 100644 --- a/tests/integration/test_replicated_merge_tree_s3_zero_copy/test.py +++ b/tests/integration/test_replicated_merge_tree_s3_zero_copy/test.py @@ -65,7 +65,6 @@ def create_table(cluster, additional_settings=None): list(cluster.instances.values())[0].query(create_table_statement) - @pytest.fixture(autouse=True) def drop_table(cluster): yield diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 5342473aefa..2c2a9e41509 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -67,8 +67,8 @@ def rabbitmq_cluster(): def rabbitmq_setup_teardown(): print("RabbitMQ is available - running test") yield # run test - for table_name in ['view', 'consumer', 'rabbitmq']: - instance.query(f'DROP TABLE IF EXISTS test.{table_name}') + instance.query('DROP DATABASE test NO DELAY') + instance.query('CREATE DATABASE test') # Tests diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index bfeda84fa21..b4f54aa1e10 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -11,6 +11,7 @@ import helpers.client import pytest from helpers.cluster import ClickHouseCluster, ClickHouseInstance, get_instances_dir from helpers.network import PartitionManager +from helpers.test_tools import exec_query_with_retry MINIO_INTERNAL_PORT = 9001 @@ -809,7 +810,7 @@ def test_seekable_formats(started_cluster): assert(int(result) == 5000000) table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')" - instance.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)") + exec_query_with_retry(instance, f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)") result = instance.query(f"SELECT count() FROM {table_function}") assert(int(result) == 5000000) @@ -832,7 +833,7 @@ def test_seekable_formats_url(started_cluster): assert(int(result) == 5000000) table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')" - instance.query(f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)") + exec_query_with_retry(instance, f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)") table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_orc', 'ORC', 'a Int32, b String')" result = instance.query(f"SELECT count() FROM {table_function}") @@ -842,3 +843,18 @@ def test_seekable_formats_url(started_cluster): result = instance.query(f"SELECT formatReadableSize(memory_usage) FROM system.query_log WHERE startsWith(query, 'SELECT count() FROM url') AND memory_usage > 0 ORDER BY event_time desc") print(result[:3]) assert(int(result[:3]) < 200) + + +def test_empty_file(started_cluster): + bucket = started_cluster.minio_bucket + instance = started_cluster.instances["dummy"] + + name = "empty" + url = f'http://{started_cluster.minio_ip}:{MINIO_INTERNAL_PORT}/{bucket}/{name}' + + minio = started_cluster.minio_client + minio.put_object(bucket, name, io.BytesIO(b""), 0) + + table_function = f"s3('{url}', 'CSV', 'id Int32')" + result = instance.query(f"SELECT count() FROM {table_function}") + assert(int(result) == 0) diff --git a/tests/queries/0_stateless/02006_test_positional_arguments.reference b/tests/queries/0_stateless/02006_test_positional_arguments.reference index 7b75ab43430..5fc070ffd0b 100644 --- a/tests/queries/0_stateless/02006_test_positional_arguments.reference +++ b/tests/queries/0_stateless/02006_test_positional_arguments.reference @@ -46,22 +46,6 @@ select x1, x2, x3 from test order by 3 limit 1 by 1; 100 100 1 10 1 10 1 10 100 -explain syntax select x3, x2, x1 from test order by 1 + 1; -SELECT - x3, - x2, - x1 -FROM test -ORDER BY x3 + x3 ASC -explain syntax select x3, x2, x1 from test order by (1 + 1) * 3; -SELECT - x3, - x2, - x1 -FROM test -ORDER BY (x3 + x3) * x1 ASC -select x2, x1 from test group by x2 + x1; -- { serverError 215 } -select x2, x1 from test group by 1 + 2; -- { serverError 215 } explain syntax select x3, x2, x1 from test order by 1; SELECT x3, @@ -110,27 +94,6 @@ GROUP BY x2 select max(x1), x2 from test group by 1, 2; -- { serverError 43 } select 1 + max(x1), x2 from test group by 1, 2; -- { serverError 43 } -select x1 + x2, x3 from test group by x1 + x2, x3; -11 100 -200 1 -11 200 -11 10 -select x3, x2, x1 from test order by x3 * 2, x2, x1; -- check x3 * 2 does not become x3 * x2 -1 100 100 -1 100 100 -10 1 10 -100 10 1 -200 1 10 -200 10 1 -explain syntax select x1, x3 from test group by 1 + 2, 1, 2; -SELECT - x1, - x3 -FROM test -GROUP BY - x1 + x3, - x1, - x3 explain syntax select x1 + x3, x3 from test group by 1, 2; SELECT x1 + x3, @@ -152,3 +115,5 @@ SELECT 1 + 1 AS a GROUP BY a select substr('aaaaaaaaaaaaaa', 8) as a group by a; aaaaaaa +select substr('aaaaaaaaaaaaaa', 8) as a group by substr('aaaaaaaaaaaaaa', 8); +aaaaaaa diff --git a/tests/queries/0_stateless/02006_test_positional_arguments.sql b/tests/queries/0_stateless/02006_test_positional_arguments.sql index 3ba01b47efa..3a2cf76f6c4 100644 --- a/tests/queries/0_stateless/02006_test_positional_arguments.sql +++ b/tests/queries/0_stateless/02006_test_positional_arguments.sql @@ -22,12 +22,6 @@ select x1, x2, x3 from test order by 3 limit 1 by 3; select x1, x2, x3 from test order by x3 limit 1 by x1; select x1, x2, x3 from test order by 3 limit 1 by 1; -explain syntax select x3, x2, x1 from test order by 1 + 1; -explain syntax select x3, x2, x1 from test order by (1 + 1) * 3; - -select x2, x1 from test group by x2 + x1; -- { serverError 215 } -select x2, x1 from test group by 1 + 2; -- { serverError 215 } - explain syntax select x3, x2, x1 from test order by 1; explain syntax select x3 + 1, x2, x1 from test order by 1; explain syntax select x3, x3 - x2, x2, x1 from test order by 2; @@ -37,11 +31,7 @@ explain syntax select 1 + greatest(x1, 1), x2 from test group by 1, 2; select max(x1), x2 from test group by 1, 2; -- { serverError 43 } select 1 + max(x1), x2 from test group by 1, 2; -- { serverError 43 } -select x1 + x2, x3 from test group by x1 + x2, x3; -select x3, x2, x1 from test order by x3 * 2, x2, x1; -- check x3 * 2 does not become x3 * x2 - -explain syntax select x1, x3 from test group by 1 + 2, 1, 2; explain syntax select x1 + x3, x3 from test group by 1, 2; create table test2(x1 Int, x2 Int, x3 Int) engine=Memory; @@ -52,3 +42,5 @@ select a, b, c, d, e, f from (select 44 a, 88 b, 13 c, 14 d, 15 e, 16 f) t grou explain syntax select plus(1, 1) as a group by a; select substr('aaaaaaaaaaaaaa', 8) as a group by a; +select substr('aaaaaaaaaaaaaa', 8) as a group by substr('aaaaaaaaaaaaaa', 8); + diff --git a/tests/queries/0_stateless/02007_ipv4_and_ipv6_to_and_from_string.reference b/tests/queries/0_stateless/02007_ipv4_and_ipv6_to_and_from_string.reference new file mode 100644 index 00000000000..8a4df1605fb --- /dev/null +++ b/tests/queries/0_stateless/02007_ipv4_and_ipv6_to_and_from_string.reference @@ -0,0 +1,6 @@ +127.0.0.1 IPv4 +127.0.0.1 String +2001:db8:0:85a3::ac1f:8001 IPv6 +2001:db8:0:85a3::ac1f:8001 String +0.0.0.0 IPv4 +:: IPv6 diff --git a/tests/queries/0_stateless/02007_ipv4_and_ipv6_to_and_from_string.sql b/tests/queries/0_stateless/02007_ipv4_and_ipv6_to_and_from_string.sql new file mode 100644 index 00000000000..2fcc20b9811 --- /dev/null +++ b/tests/queries/0_stateless/02007_ipv4_and_ipv6_to_and_from_string.sql @@ -0,0 +1,13 @@ +SELECT CAST('127.0.0.1' as IPv4) as v, toTypeName(v); +SELECT CAST(toIPv4('127.0.0.1') as String) as v, toTypeName(v); + +SELECT CAST('2001:0db8:0000:85a3:0000:0000:ac1f:8001' as IPv6) as v, toTypeName(v); +SELECT CAST(toIPv6('2001:0db8:0000:85a3:0000:0000:ac1f:8001') as String) as v, toTypeName(v); + +SELECT toIPv4('hello') as v, toTypeName(v); +SELECT toIPv6('hello') as v, toTypeName(v); + +SELECT CAST('hello' as IPv4) as v, toTypeName(v); -- { serverError CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING } +SELECT CAST('hello' as IPv6) as v, toTypeName(v); -- { serverError CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING } + +SELECT CAST('1.1.1.1' as IPv6) as v, toTypeName(v); -- { serverError CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING } diff --git a/tests/queries/0_stateless/02010_lc_native.python b/tests/queries/0_stateless/02010_lc_native.python index 56e981555f3..71965512e64 100755 --- a/tests/queries/0_stateless/02010_lc_native.python +++ b/tests/queries/0_stateless/02010_lc_native.python @@ -302,11 +302,44 @@ def insertLowCardinalityRowWithIncorrectDictType(): print(readException(s)) s.close() +def insertLowCardinalityRowWithIncorrectAdditionalKeys(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.settimeout(30) + s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT)) + sendHello(s) + receiveHello(s) + sendQuery(s, 'insert into {}.tab format TSV'.format(CLICKHOUSE_DATABASE)) + + # external tables + sendEmptyBlock(s) + readHeader(s) + + # Data + ba = bytearray() + writeVarUInt(2, ba) # Data + writeStringBinary('', ba) + serializeBlockInfo(ba) + writeVarUInt(1, ba) # rows + writeVarUInt(1, ba) # columns + writeStringBinary('x', ba) + writeStringBinary('LowCardinality(String)', ba) + ba.extend([1] + [0] * 7) # SharedDictionariesWithAdditionalKeys + ba.extend([3, 0] + [0] * 6) # indexes type: UInt64 [3], with NO additional keys [0] + ba.extend([1] + [0] * 7) # num_keys in dict + writeStringBinary('hello', ba) # key + ba.extend([1] + [0] * 7) # num_indexes + ba.extend([0] * 8) # UInt64 index (0 for 'hello') + s.sendall(ba) + + assertPacket(readVarUInt(s), 2) + print(readException(s)) + s.close() def main(): insertValidLowCardinalityRow() insertLowCardinalityRowWithIndexOverflow() insertLowCardinalityRowWithIncorrectDictType() + insertLowCardinalityRowWithIncorrectAdditionalKeys() if __name__ == "__main__": main() diff --git a/tests/queries/0_stateless/02010_lc_native.reference b/tests/queries/0_stateless/02010_lc_native.reference index 0167f05c952..bbf0c9c025d 100644 --- a/tests/queries/0_stateless/02010_lc_native.reference +++ b/tests/queries/0_stateless/02010_lc_native.reference @@ -6,3 +6,6 @@ code 117: Index for LowCardinality is out of range. Dictionary size is 1, but f Rows 0 Columns 1 Column x type LowCardinality(String) code 117: LowCardinality indexes serialization type for Native format cannot use global dictionary +Rows 0 Columns 1 +Column x type LowCardinality(String) +code 117: No additional keys found.