Merge remote-tracking branch 'blessed/master' into kill_scalar_github

This commit is contained in:
Raúl Marín 2021-12-22 16:42:10 +01:00
commit 1d9916eb25
78 changed files with 890 additions and 335 deletions

View File

@ -920,7 +920,7 @@ jobs:
- BuilderDebMsan - BuilderDebMsan
- BuilderDebDebug - BuilderDebDebug
runs-on: [self-hosted, style-checker] runs-on: [self-hosted, style-checker]
if: always() if: ${{ success() || failure() }}
steps: steps:
- name: Set envs - name: Set envs
run: | run: |
@ -960,7 +960,7 @@ jobs:
- BuilderBinDarwinAarch64 - BuilderBinDarwinAarch64
- BuilderBinPPC64 - BuilderBinPPC64
runs-on: [self-hosted, style-checker] runs-on: [self-hosted, style-checker]
if: always() if: ${{ success() || failure() }}
steps: steps:
- name: Set envs - name: Set envs
run: | run: |

42
.github/workflows/woboq.yml vendored Normal file
View File

@ -0,0 +1,42 @@
name: WoboqBuilder
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
concurrency:
group: woboq
on: # yamllint disable-line rule:truthy
schedule:
- cron: '0 */18 * * *'
workflow_dispatch:
jobs:
# don't use dockerhub push because this image updates so rarely
WoboqCodebrowser:
runs-on: [self-hosted, style-checker]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/codebrowser
REPO_COPY=${{runner.temp}}/codebrowser/ClickHouse
IMAGES_PATH=${{runner.temp}}/images_path
EOF
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
with:
submodules: 'true'
- name: Codebrowser
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
cp -r $GITHUB_WORKSPACE $TEMP_PATH
cd $REPO_COPY/tests/ci && python3 codebrowser_check.py
- name: Cleanup
if: always()
run: |
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH

View File

@ -27,8 +27,7 @@ execute_process(COMMAND uname -m OUTPUT_VARIABLE ARCH)
if (OS MATCHES "Linux" if (OS MATCHES "Linux"
AND NOT DEFINED CMAKE_TOOLCHAIN_FILE AND NOT DEFINED CMAKE_TOOLCHAIN_FILE
AND NOT DISABLE_HERMETIC_BUILD AND NOT DISABLE_HERMETIC_BUILD
AND ($ENV{CC} MATCHES ".*clang.*" OR CMAKE_C_COMPILER MATCHES ".*clang.*") AND ($ENV{CC} MATCHES ".*clang.*" OR CMAKE_C_COMPILER MATCHES ".*clang.*"))
AND (USE_STATIC_LIBRARIES OR NOT DEFINED USE_STATIC_LIBRARIES))
if (ARCH MATCHES "amd64|x86_64") if (ARCH MATCHES "amd64|x86_64")
set (CMAKE_TOOLCHAIN_FILE "cmake/linux/toolchain-x86_64.cmake" CACHE INTERNAL "" FORCE) set (CMAKE_TOOLCHAIN_FILE "cmake/linux/toolchain-x86_64.cmake" CACHE INTERNAL "" FORCE)

View File

@ -21,4 +21,12 @@ Value Row::operator[] (const char * name) const
throw Exception(std::string("Unknown column ") + name); throw Exception(std::string("Unknown column ") + name);
} }
enum enum_field_types Row::getFieldType(size_t i)
{
if (i >= res->getNumFields())
throw Exception(std::string("Array Index Overflow"));
MYSQL_FIELDS fields = res->getFields();
return fields[i].type;
}
} }

View File

@ -79,6 +79,8 @@ public:
*/ */
operator private_bool_type() const { return row == nullptr ? nullptr : &Row::row; } operator private_bool_type() const { return row == nullptr ? nullptr : &Row::row; }
enum enum_field_types getFieldType(size_t i);
private: private:
MYSQL_ROW row{}; MYSQL_ROW row{};
ResultBase * res{}; ResultBase * res{};

View File

@ -16,6 +16,8 @@ using MYSQL_ROW = char**;
struct st_mysql_field; struct st_mysql_field;
using MYSQL_FIELD = st_mysql_field; using MYSQL_FIELD = st_mysql_field;
enum struct enum_field_types;
#endif #endif
namespace mysqlxx namespace mysqlxx

View File

@ -14,9 +14,12 @@ set (TOOLCHAIN_PATH "${CMAKE_CURRENT_LIST_DIR}/../../contrib/sysroot/linux-x86_6
set (CMAKE_SYSROOT "${TOOLCHAIN_PATH}/x86_64-linux-gnu/libc") set (CMAKE_SYSROOT "${TOOLCHAIN_PATH}/x86_64-linux-gnu/libc")
set (CMAKE_C_FLAGS_INIT "${CMAKE_C_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_CXX_FLAGS_INIT "${CMAKE_CXX_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_ASM_FLAGS_INIT "${CMAKE_ASM_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}") set (CMAKE_ASM_FLAGS "${CMAKE_ASM_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_MODULE_LINKER_FLAGS "${CMAKE_MODULE_LINKER_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (HAS_PRE_1970_EXITCODE "0" CACHE STRING "Result from TRY_RUN" FORCE) set (HAS_PRE_1970_EXITCODE "0" CACHE STRING "Result from TRY_RUN" FORCE)
set (HAS_PRE_1970_EXITCODE__TRYRUN_OUTPUT "" CACHE STRING "Output from TRY_RUN" FORCE) set (HAS_PRE_1970_EXITCODE__TRYRUN_OUTPUT "" CACHE STRING "Output from TRY_RUN" FORCE)

2
contrib/cassandra vendored

@ -1 +1 @@
Subproject commit eb9b68dadbb4417a2c132ad4a1c2fa76e65e6fc1 Subproject commit f4a31e92a25c34c02c7291ff97c7813bc83b0e09

2
contrib/sysroot vendored

@ -1 +1 @@
Subproject commit 410845187f582c5e6692b53dddbe43efbb728734 Subproject commit bbcac834526d90d1e764164b861be426891d1743

View File

@ -6,7 +6,7 @@ FROM clickhouse/binary-builder
ARG apt_archive="http://archive.ubuntu.com" ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
RUN apt-get update && apt-get --yes --allow-unauthenticated install clang-9 libllvm9 libclang-9-dev RUN apt-get update && apt-get --yes --allow-unauthenticated install clang-13 libllvm13 libclang-13-dev
# repo versions doesn't work correctly with C++17 # repo versions doesn't work correctly with C++17
# also we push reports to s3, so we add index.html to subfolder urls # also we push reports to s3, so we add index.html to subfolder urls
@ -23,12 +23,12 @@ ENV SOURCE_DIRECTORY=/repo_folder
ENV BUILD_DIRECTORY=/build ENV BUILD_DIRECTORY=/build
ENV HTML_RESULT_DIRECTORY=$BUILD_DIRECTORY/html_report ENV HTML_RESULT_DIRECTORY=$BUILD_DIRECTORY/html_report
ENV SHA=nosha ENV SHA=nosha
ENV DATA="data" ENV DATA="https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/data"
CMD mkdir -p $BUILD_DIRECTORY && cd $BUILD_DIRECTORY && \ CMD mkdir -p $BUILD_DIRECTORY && cd $BUILD_DIRECTORY && \
cmake $SOURCE_DIRECTORY -DCMAKE_CXX_COMPILER=/usr/bin/clang\+\+-13 -DCMAKE_C_COMPILER=/usr/bin/clang-13 -DCMAKE_EXPORT_COMPILE_COMMANDS=ON -DENABLE_EMBEDDED_COMPILER=0 -DENABLE_S3=0 && \ cmake $SOURCE_DIRECTORY -DCMAKE_CXX_COMPILER=/usr/bin/clang\+\+-13 -DCMAKE_C_COMPILER=/usr/bin/clang-13 -DCMAKE_EXPORT_COMPILE_COMMANDS=ON -DENABLE_EMBEDDED_COMPILER=0 -DENABLE_S3=0 && \
mkdir -p $HTML_RESULT_DIRECTORY && \ mkdir -p $HTML_RESULT_DIRECTORY && \
$CODEGEN -b $BUILD_DIRECTORY -a -o $HTML_RESULT_DIRECTORY -p ClickHouse:$SOURCE_DIRECTORY:$SHA -d $DATA | ts '%Y-%m-%d %H:%M:%S' && \ $CODEGEN -b $BUILD_DIRECTORY -a -o $HTML_RESULT_DIRECTORY -p ClickHouse:$SOURCE_DIRECTORY:$SHA -d $DATA | ts '%Y-%m-%d %H:%M:%S' && \
cp -r $STATIC_DATA $HTML_RESULT_DIRECTORY/ &&\ cp -r $STATIC_DATA $HTML_RESULT_DIRECTORY/ &&\
$CODEINDEX $HTML_RESULT_DIRECTORY -d $DATA | ts '%Y-%m-%d %H:%M:%S' && \ $CODEINDEX $HTML_RESULT_DIRECTORY -d "$DATA" | ts '%Y-%m-%d %H:%M:%S' && \
mv $HTML_RESULT_DIRECTORY /test_output mv $HTML_RESULT_DIRECTORY /test_output

View File

@ -83,6 +83,7 @@ When working with the `MaterializedMySQL` database engine, [ReplacingMergeTree](
| VARCHAR, VAR_STRING | [String](../../sql-reference/data-types/string.md) | | VARCHAR, VAR_STRING | [String](../../sql-reference/data-types/string.md) |
| BLOB | [String](../../sql-reference/data-types/string.md) | | BLOB | [String](../../sql-reference/data-types/string.md) |
| BINARY | [FixedString](../../sql-reference/data-types/fixedstring.md) | | BINARY | [FixedString](../../sql-reference/data-types/fixedstring.md) |
| BIT | [UInt64](../../sql-reference/data-types/int-uint.md) |
[Nullable](../../sql-reference/data-types/nullable.md) is supported. [Nullable](../../sql-reference/data-types/nullable.md) is supported.

View File

@ -14,11 +14,11 @@ To enable Kerberos, one should include `kerberos` section in `config.xml`. This
#### Parameters: #### Parameters:
- `principal` - canonical service principal name that will be acquired and used when accepting security contexts. - `principal` - canonical service principal name that will be acquired and used when accepting security contexts.
- This parameter is optional, if omitted, the default principal will be used. - This parameter is optional, if omitted, the default principal will be used.
- `realm` - a realm, that will be used to restrict authentication to only those requests whose initiator's realm matches it. - `realm` - a realm, that will be used to restrict authentication to only those requests whose initiator's realm matches it.
- This parameter is optional, if omitted, no additional filtering by realm will be applied. - This parameter is optional, if omitted, no additional filtering by realm will be applied.
Example (goes into `config.xml`): Example (goes into `config.xml`):
@ -75,7 +75,7 @@ In order to enable Kerberos authentication for the user, specify `kerberos` sect
Parameters: Parameters:
- `realm` - a realm that will be used to restrict authentication to only those requests whose initiator's realm matches it. - `realm` - a realm that will be used to restrict authentication to only those requests whose initiator's realm matches it.
- This parameter is optional, if omitted, no additional filtering by realm will be applied. - This parameter is optional, if omitted, no additional filtering by realm will be applied.
Example (goes into `users.xml`): Example (goes into `users.xml`):

View File

@ -31,7 +31,7 @@ CREATE ROLE accountant;
GRANT SELECT ON db.* TO accountant; GRANT SELECT ON db.* TO accountant;
``` ```
This sequence of queries creates the role `accountant` that has the privilege of reading data from the `accounting` database. This sequence of queries creates the role `accountant` that has the privilege of reading data from the `db` database.
Assigning the role to the user `mira`: Assigning the role to the user `mira`:

View File

@ -175,6 +175,11 @@ public:
chars.reserve(n * size); chars.reserve(n * size);
} }
void resize(size_t size)
{
chars.resize(n * size);
}
void getExtremes(Field & min, Field & max) const override; void getExtremes(Field & min, Field & max) const override;
bool structureEquals(const IColumn & rhs) const override bool structureEquals(const IColumn & rhs) const override

View File

@ -0,0 +1,91 @@
#pragma once
#include <cstring>
#include <cassert>
#include <Columns/IColumn.h>
#include <Common/PODArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <IO/WriteBufferFromVector.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_LARGE_STRING_SIZE;
}
namespace ColumnStringHelpers
{
/** Simplifies writing data to the ColumnString or ColumnFixedString via WriteBuffer.
*
* Take care of little subtle details, like padding or proper offsets.
*/
template <typename ColumnType>
class WriteHelper
{
ColumnType & col;
WriteBufferFromVector<typename ColumnType::Chars> buffer;
size_t prev_row_buffer_size = 0;
static ColumnType & resizeColumn(ColumnType & column, size_t rows)
{
if constexpr (std::is_same_v<ColumnType, ColumnFixedString>)
column.resize(rows);
else
{
column.getOffsets().reserve(rows);
/// Using coefficient 2 for initial size is arbitrary.
column.getChars().resize(rows * 2);
}
return column;
}
public:
WriteHelper(ColumnType & col_, size_t expected_rows)
: col(resizeColumn(col_, expected_rows))
, buffer(col.getChars())
{}
~WriteHelper() = default;
void finalize()
{
buffer.finalize();
}
auto & getWriteBuffer()
{
return buffer;
}
inline void rowWritten()
{
if constexpr (std::is_same_v<ColumnType, ColumnFixedString>)
{
if (buffer.count() > prev_row_buffer_size + col.getN())
throw Exception(
ErrorCodes::TOO_LARGE_STRING_SIZE,
"Too large string for FixedString column");
// Pad with zeroes on the right to maintain FixedString invariant.
const auto excess_bytes = buffer.count() % col.getN();
const auto fill_bytes = col.getN() - excess_bytes;
writeChar(0, fill_bytes, buffer);
}
else
{
writeChar(0, buffer);
col.getOffsets().push_back(buffer.count());
}
prev_row_buffer_size = buffer.count();
}
};
}
}

View File

@ -41,24 +41,6 @@ namespace ErrorCodes
/// For cutting preprocessed path to this base /// For cutting preprocessed path to this base
static std::string main_config_path; static std::string main_config_path;
/// Extracts from a string the first encountered number consisting of at least two digits.
static std::string numberFromHost(const std::string & s)
{
for (size_t i = 0; i < s.size(); ++i)
{
std::string res;
size_t j = i;
while (j < s.size() && isNumericASCII(s[j]))
res += s[j++];
if (res.size() >= 2)
{
while (res[0] == '0')
res.erase(res.begin());
return res;
}
}
return "";
}
bool ConfigProcessor::isPreprocessedFile(const std::string & path) bool ConfigProcessor::isPreprocessedFile(const std::string & path)
{ {
@ -245,19 +227,6 @@ void ConfigProcessor::merge(XMLDocumentPtr config, XMLDocumentPtr with)
mergeRecursive(config, config_root, with_root); mergeRecursive(config, config_root, with_root);
} }
static std::string layerFromHost()
{
struct utsname buf;
if (uname(&buf))
throw Poco::Exception(std::string("uname failed: ") + errnoToString(errno));
std::string layer = numberFromHost(buf.nodename);
if (layer.empty())
throw Poco::Exception(std::string("no layer in host name: ") + buf.nodename);
return layer;
}
void ConfigProcessor::doIncludesRecursive( void ConfigProcessor::doIncludesRecursive(
XMLDocumentPtr config, XMLDocumentPtr config,
XMLDocumentPtr include_from, XMLDocumentPtr include_from,
@ -288,18 +257,6 @@ void ConfigProcessor::doIncludesRecursive(
if (node->nodeType() != Node::ELEMENT_NODE) if (node->nodeType() != Node::ELEMENT_NODE)
return; return;
/// Substitute <layer> for the number extracted from the hostname only if there is an
/// empty <layer> tag without attributes in the original file.
if (node->nodeName() == "layer"
&& !node->hasAttributes()
&& !node->hasChildNodes()
&& node->nodeValue().empty())
{
NodePtr new_node = config->createTextNode(layerFromHost());
node->appendChild(new_node);
return;
}
std::map<std::string, const Node *> attr_nodes; std::map<std::string, const Node *> attr_nodes;
NamedNodeMapPtr attributes = node->attributes(); NamedNodeMapPtr attributes = node->attributes();
size_t substs_count = 0; size_t substs_count = 0;

View File

@ -59,7 +59,6 @@ public:
/// 4) If zk_node_cache is non-NULL, replace elements matching the "<foo from_zk="/bar">" pattern with /// 4) If zk_node_cache is non-NULL, replace elements matching the "<foo from_zk="/bar">" pattern with
/// "<foo>contents of the /bar ZooKeeper node</foo>". /// "<foo>contents of the /bar ZooKeeper node</foo>".
/// If has_zk_includes is non-NULL and there are such elements, set has_zk_includes to true. /// If has_zk_includes is non-NULL and there are such elements, set has_zk_includes to true.
/// 5) (Yandex.Metrika-specific) Substitute "<layer/>" with "<layer>layer number from the hostname</layer>".
XMLDocumentPtr processConfig( XMLDocumentPtr processConfig(
bool * has_zk_includes = nullptr, bool * has_zk_includes = nullptr,
zkutil::ZooKeeperNodeCache * zk_node_cache = nullptr, zkutil::ZooKeeperNodeCache * zk_node_cache = nullptr,

View File

@ -259,6 +259,8 @@
M(RemoteFSUnusedPrefetches, "Number of prefetches pending at buffer destruction") \ M(RemoteFSUnusedPrefetches, "Number of prefetches pending at buffer destruction") \
M(RemoteFSPrefetchedReads, "Number of reads from prefecthed buffer") \ M(RemoteFSPrefetchedReads, "Number of reads from prefecthed buffer") \
M(RemoteFSUnprefetchedReads, "Number of reads from unprefetched buffer") \ M(RemoteFSUnprefetchedReads, "Number of reads from unprefetched buffer") \
M(RemoteFSLazySeeks, "Number of lazy seeks") \
M(RemoteFSSeeksWithReset, "Number of seeks which lead to a new connection") \
M(RemoteFSBuffers, "Number of buffers created for asynchronous reading from remote filesystem") \ M(RemoteFSBuffers, "Number of buffers created for asynchronous reading from remote filesystem") \
\ \
M(ReadBufferSeekCancelConnection, "Number of seeks which lead to new connection (s3, http)") \ M(ReadBufferSeekCancelConnection, "Number of seeks which lead to new connection (s3, http)") \

View File

@ -230,6 +230,7 @@ namespace MySQLReplication
pos += 2; pos += 2;
break; break;
} }
case MYSQL_TYPE_BIT:
case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING: { case MYSQL_TYPE_VAR_STRING: {
/// Little-Endian /// Little-Endian
@ -584,6 +585,15 @@ namespace MySQLReplication
} }
break; break;
} }
case MYSQL_TYPE_BIT:
{
UInt32 bits = ((meta >> 8) * 8) + (meta & 0xff);
UInt32 size = (bits + 7) / 8;
UInt64 val = 0UL;
readBigEndianStrict(payload, reinterpret_cast<char *>(&val), size);
row.push_back(val);
break;
}
case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING: case MYSQL_TYPE_VAR_STRING:
{ {

View File

@ -59,6 +59,7 @@ class DataTypeDecimalBase : public IDataType
public: public:
using FieldType = T; using FieldType = T;
using ColumnType = ColumnDecimal<T>; using ColumnType = ColumnDecimal<T>;
static constexpr auto type_id = TypeId<T>;
static constexpr bool is_parametric = true; static constexpr bool is_parametric = true;

View File

@ -38,6 +38,7 @@ class DataTypeEnum final : public IDataTypeEnum, public EnumValues<Type>
public: public:
using FieldType = Type; using FieldType = Type;
using ColumnType = ColumnVector<FieldType>; using ColumnType = ColumnVector<FieldType>;
static constexpr auto type_id = sizeof(FieldType) == 1 ? TypeIndex::Enum8 : TypeIndex::Enum16;
using typename EnumValues<Type>::Values; using typename EnumValues<Type>::Values;
static constexpr bool is_parametric = true; static constexpr bool is_parametric = true;
@ -52,7 +53,7 @@ public:
std::string doGetName() const override { return type_name; } std::string doGetName() const override { return type_name; }
const char * getFamilyName() const override; const char * getFamilyName() const override;
TypeIndex getTypeId() const override { return sizeof(FieldType) == 1 ? TypeIndex::Enum8 : TypeIndex::Enum16; } TypeIndex getTypeId() const override { return type_id; }
FieldType readValue(ReadBuffer & istr) const FieldType readValue(ReadBuffer & istr) const
{ {

View File

@ -10,6 +10,8 @@
namespace DB namespace DB
{ {
class ColumnFixedString;
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int ARGUMENT_OUT_OF_BOUND; extern const int ARGUMENT_OUT_OF_BOUND;
@ -22,7 +24,10 @@ private:
size_t n; size_t n;
public: public:
using ColumnType = ColumnFixedString;
static constexpr bool is_parametric = true; static constexpr bool is_parametric = true;
static constexpr auto type_id = TypeIndex::FixedString;
DataTypeFixedString(size_t n_) : n(n_) DataTypeFixedString(size_t n_) : n(n_)
{ {
@ -33,7 +38,7 @@ public:
} }
std::string doGetName() const override; std::string doGetName() const override;
TypeIndex getTypeId() const override { return TypeIndex::FixedString; } TypeIndex getTypeId() const override { return type_id; }
const char * getFamilyName() const override { return "FixedString"; } const char * getFamilyName() const override { return "FixedString"; }

View File

@ -20,6 +20,7 @@ class DataTypeNumberBase : public IDataType
public: public:
static constexpr bool is_parametric = false; static constexpr bool is_parametric = false;
static constexpr auto family_name = TypeName<T>; static constexpr auto family_name = TypeName<T>;
static constexpr auto type_id = TypeId<T>;
using FieldType = T; using FieldType = T;
using ColumnType = ColumnVector<T>; using ColumnType = ColumnVector<T>;

View File

@ -6,10 +6,13 @@
namespace DB namespace DB
{ {
class ColumnString;
class DataTypeString final : public IDataType class DataTypeString final : public IDataType
{ {
public: public:
using FieldType = String; using FieldType = String;
using ColumnType = ColumnString;
static constexpr bool is_parametric = false; static constexpr bool is_parametric = false;
static constexpr auto type_id = TypeIndex::String; static constexpr auto type_id = TypeIndex::String;

View File

@ -15,9 +15,10 @@ public:
using FieldType = UUID; using FieldType = UUID;
using ColumnType = ColumnVector<UUID>; using ColumnType = ColumnVector<UUID>;
static constexpr auto type_id = TypeIndex::UUID;
const char * getFamilyName() const override { return "UUID"; } const char * getFamilyName() const override { return "UUID"; }
TypeIndex getTypeId() const override { return TypeIndex::UUID; } TypeIndex getTypeId() const override { return type_id; }
Field getDefault() const override; Field getDefault() const override;

View File

@ -86,6 +86,7 @@ void registerDataTypeNumbers(DataTypeFactory & factory)
factory.registerAlias("INT UNSIGNED", "UInt32", DataTypeFactory::CaseInsensitive); factory.registerAlias("INT UNSIGNED", "UInt32", DataTypeFactory::CaseInsensitive);
factory.registerAlias("INTEGER UNSIGNED", "UInt32", DataTypeFactory::CaseInsensitive); factory.registerAlias("INTEGER UNSIGNED", "UInt32", DataTypeFactory::CaseInsensitive);
factory.registerAlias("BIGINT UNSIGNED", "UInt64", DataTypeFactory::CaseInsensitive); factory.registerAlias("BIGINT UNSIGNED", "UInt64", DataTypeFactory::CaseInsensitive);
factory.registerAlias("BIT", "UInt64", DataTypeFactory::CaseInsensitive);
} }
} }

View File

@ -636,6 +636,9 @@ void SerializationLowCardinality::deserializeBinaryBulkWithMultipleStreams(
if (!low_cardinality_state->index_type.need_global_dictionary) if (!low_cardinality_state->index_type.need_global_dictionary)
{ {
if(additional_keys == nullptr)
throw Exception("No additional keys found.", ErrorCodes::INCORRECT_DATA);
ColumnPtr keys_column = additional_keys; ColumnPtr keys_column = additional_keys;
if (low_cardinality_state->null_map) if (low_cardinality_state->null_map)
keys_column = ColumnNullable::create(additional_keys, low_cardinality_state->null_map); keys_column = ColumnNullable::create(additional_keys, low_cardinality_state->null_map);
@ -662,6 +665,9 @@ void SerializationLowCardinality::deserializeBinaryBulkWithMultipleStreams(
if (!maps.additional_keys_map->empty()) if (!maps.additional_keys_map->empty())
{ {
if(additional_keys == nullptr)
throw Exception("No additional keys found.", ErrorCodes::INCORRECT_DATA);
auto used_add_keys = additional_keys->index(*maps.additional_keys_map, 0); auto used_add_keys = additional_keys->index(*maps.additional_keys_map, 0);
if (dictionary_type->isNullable()) if (dictionary_type->isNullable())

View File

@ -91,6 +91,10 @@ DataTypePtr convertMySQLDataType(MultiEnum<MySQLDataTypesSupport> type_support,
res = std::make_shared<DataTypeDateTime64>(scale); res = std::make_shared<DataTypeDateTime64>(scale);
} }
} }
else if (type_name == "bit")
{
res = std::make_shared<DataTypeUInt64>();
}
else if (type_support.isSet(MySQLDataTypesSupport::DECIMAL) && (type_name == "numeric" || type_name == "decimal")) else if (type_support.isSet(MySQLDataTypesSupport::DECIMAL) && (type_name == "numeric" || type_name == "decimal"))
{ {
if (precision <= DecimalUtils::max_precision<Decimal32>) if (precision <= DecimalUtils::max_precision<Decimal32>)

View File

@ -21,6 +21,8 @@ namespace ProfileEvents
extern const Event RemoteFSUnusedPrefetches; extern const Event RemoteFSUnusedPrefetches;
extern const Event RemoteFSPrefetchedReads; extern const Event RemoteFSPrefetchedReads;
extern const Event RemoteFSUnprefetchedReads; extern const Event RemoteFSUnprefetchedReads;
extern const Event RemoteFSLazySeeks;
extern const Event RemoteFSSeeksWithReset;
extern const Event RemoteFSBuffers; extern const Event RemoteFSBuffers;
} }
@ -152,11 +154,16 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
Stopwatch watch; Stopwatch watch;
{ {
size = prefetch_future.get(); auto result = prefetch_future.get();
size = result.size;
auto offset = result.offset;
assert(offset < size);
if (size) if (size)
{ {
memory.swap(prefetch_buffer); memory.swap(prefetch_buffer);
set(memory.data(), memory.size()); size -= offset;
set(memory.data() + offset, size);
working_buffer.resize(size); working_buffer.resize(size);
file_offset_of_buffer_end += size; file_offset_of_buffer_end += size;
} }
@ -168,16 +175,23 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
else else
{ {
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads); ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
size = readInto(memory.data(), memory.size()).get(); auto result = readInto(memory.data(), memory.size()).get();
size = result.size;
auto offset = result.offset;
assert(offset < size);
if (size) if (size)
{ {
set(memory.data(), memory.size()); size -= offset;
set(memory.data() + offset, size);
working_buffer.resize(size); working_buffer.resize(size);
file_offset_of_buffer_end += size; file_offset_of_buffer_end += size;
} }
} }
if (file_offset_of_buffer_end != impl->offset())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected equality {} == {}. It's a bug", file_offset_of_buffer_end, impl->offset());
prefetch_future = {}; prefetch_future = {};
return size; return size;
} }
@ -231,18 +245,22 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
pos = working_buffer.end(); pos = working_buffer.end();
/// Note: we read in range [file_offset_of_buffer_end, read_until_position). /**
if (read_until_position && file_offset_of_buffer_end < *read_until_position * Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
&& static_cast<off_t>(file_offset_of_buffer_end) >= getPosition() * Note: we read in range [file_offset_of_buffer_end, read_until_position).
&& static_cast<off_t>(file_offset_of_buffer_end) < getPosition() + static_cast<off_t>(min_bytes_for_seek)) */
off_t file_offset_before_seek = impl->offset();
if (impl->initialized()
&& read_until_position && file_offset_of_buffer_end < *read_until_position
&& static_cast<off_t>(file_offset_of_buffer_end) > file_offset_before_seek
&& static_cast<off_t>(file_offset_of_buffer_end) < file_offset_before_seek + static_cast<off_t>(min_bytes_for_seek))
{ {
/** ProfileEvents::increment(ProfileEvents::RemoteFSLazySeeks);
* Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer. bytes_to_ignore = file_offset_of_buffer_end - file_offset_before_seek;
*/
bytes_to_ignore = file_offset_of_buffer_end - getPosition();
} }
else else
{ {
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
impl->reset(); impl->reset();
} }

View File

@ -65,7 +65,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata
} }
size_t ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore) ReadBufferFromRemoteFSGather::ReadResult ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore)
{ {
/** /**
* Set `data` to current working and internal buffers. * Set `data` to current working and internal buffers.
@ -73,23 +73,24 @@ size_t ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t o
*/ */
set(data, size); set(data, size);
absolute_position = offset; file_offset_of_buffer_end = offset;
bytes_to_ignore = ignore; bytes_to_ignore = ignore;
if (bytes_to_ignore)
assert(initialized());
auto result = nextImpl(); auto result = nextImpl();
bytes_to_ignore = 0;
if (result) if (result)
return working_buffer.size(); return {working_buffer.size(), BufferBase::offset()};
return 0; return {0, 0};
} }
void ReadBufferFromRemoteFSGather::initialize() void ReadBufferFromRemoteFSGather::initialize()
{ {
/// One clickhouse file can be split into multiple files in remote fs. /// One clickhouse file can be split into multiple files in remote fs.
auto current_buf_offset = absolute_position; auto current_buf_offset = file_offset_of_buffer_end;
for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i) for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i)
{ {
const auto & [file_path, size] = metadata.remote_fs_objects[i]; const auto & [file_path, size] = metadata.remote_fs_objects[i];
@ -144,7 +145,6 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
return readImpl(); return readImpl();
} }
bool ReadBufferFromRemoteFSGather::readImpl() bool ReadBufferFromRemoteFSGather::readImpl()
{ {
swap(*current_buf); swap(*current_buf);
@ -155,15 +155,26 @@ bool ReadBufferFromRemoteFSGather::readImpl()
* we save how many bytes need to be ignored (new_offset - position() bytes). * we save how many bytes need to be ignored (new_offset - position() bytes).
*/ */
if (bytes_to_ignore) if (bytes_to_ignore)
{
current_buf->ignore(bytes_to_ignore); current_buf->ignore(bytes_to_ignore);
bytes_to_ignore = 0;
}
auto result = current_buf->next(); bool result = current_buf->hasPendingData();
if (result)
{
/// bytes_to_ignore already added.
file_offset_of_buffer_end += current_buf->available();
}
else
{
result = current_buf->next();
if (result)
file_offset_of_buffer_end += current_buf->buffer().size();
}
swap(*current_buf); swap(*current_buf);
if (result)
absolute_position += working_buffer.size();
return result; return result;
} }
@ -180,7 +191,6 @@ void ReadBufferFromRemoteFSGather::reset()
current_buf.reset(); current_buf.reset();
} }
String ReadBufferFromRemoteFSGather::getFileName() const String ReadBufferFromRemoteFSGather::getFileName() const
{ {
return canonical_path; return canonical_path;

View File

@ -37,10 +37,20 @@ public:
void setReadUntilPosition(size_t position) override; void setReadUntilPosition(size_t position) override;
size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0); struct ReadResult
{
size_t size = 0;
size_t offset = 0;
};
ReadResult readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
size_t getFileSize() const; size_t getFileSize() const;
size_t offset() const { return file_offset_of_buffer_end; }
bool initialized() const { return current_buf != nullptr; }
protected: protected:
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const = 0; virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const = 0;
@ -57,8 +67,13 @@ private:
size_t current_buf_idx = 0; size_t current_buf_idx = 0;
size_t absolute_position = 0; size_t file_offset_of_buffer_end = 0;
/**
* File: |___________________|
* Buffer: |~~~~~~~|
* file_offset_of_buffer_end: ^
*/
size_t bytes_to_ignore = 0; size_t bytes_to_ignore = 0;
size_t read_until_position = 0; size_t read_until_position = 0;

View File

@ -20,7 +20,7 @@ ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS(
off_t ReadIndirectBufferFromRemoteFS::getPosition() off_t ReadIndirectBufferFromRemoteFS::getPosition()
{ {
return impl->absolute_position - available(); return impl->file_offset_of_buffer_end - available();
} }
@ -35,29 +35,29 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
if (whence == SEEK_CUR) if (whence == SEEK_CUR)
{ {
/// If position within current working buffer - shift pos. /// If position within current working buffer - shift pos.
if (!working_buffer.empty() && size_t(getPosition() + offset_) < impl->absolute_position) if (!working_buffer.empty() && size_t(getPosition() + offset_) < impl->file_offset_of_buffer_end)
{ {
pos += offset_; pos += offset_;
return getPosition(); return getPosition();
} }
else else
{ {
impl->absolute_position += offset_; impl->file_offset_of_buffer_end += offset_;
} }
} }
else if (whence == SEEK_SET) else if (whence == SEEK_SET)
{ {
/// If position within current working buffer - shift pos. /// If position within current working buffer - shift pos.
if (!working_buffer.empty() if (!working_buffer.empty()
&& size_t(offset_) >= impl->absolute_position - working_buffer.size() && size_t(offset_) >= impl->file_offset_of_buffer_end - working_buffer.size()
&& size_t(offset_) < impl->absolute_position) && size_t(offset_) < impl->file_offset_of_buffer_end)
{ {
pos = working_buffer.end() - (impl->absolute_position - offset_); pos = working_buffer.end() - (impl->file_offset_of_buffer_end - offset_);
return getPosition(); return getPosition();
} }
else else
{ {
impl->absolute_position = offset_; impl->file_offset_of_buffer_end = offset_;
} }
} }
else else
@ -66,7 +66,7 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
impl->reset(); impl->reset();
pos = working_buffer.end(); pos = working_buffer.end();
return impl->absolute_position; return impl->file_offset_of_buffer_end;
} }

View File

@ -8,7 +8,6 @@
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <IO/SeekableReadBuffer.h> #include <IO/SeekableReadBuffer.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <future> #include <future>
#include <iostream> #include <iostream>
@ -28,7 +27,7 @@ namespace CurrentMetrics
namespace DB namespace DB
{ {
size_t ThreadPoolRemoteFSReader::RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore) ReadBufferFromRemoteFSGather::ReadResult ThreadPoolRemoteFSReader::RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore)
{ {
return reader->readInto(data, size, offset, ignore); return reader->readInto(data, size, offset, ignore);
} }
@ -44,18 +43,18 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
{ {
auto task = std::make_shared<std::packaged_task<Result()>>([request] auto task = std::make_shared<std::packaged_task<Result()>>([request]
{ {
setThreadName("ThreadPoolRemoteFSRead"); setThreadName("VFSRead");
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get()); auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
Stopwatch watch(CLOCK_MONOTONIC); Stopwatch watch(CLOCK_MONOTONIC);
auto bytes_read = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore); auto [bytes_read, offset] = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
watch.stop(); watch.stop();
ProfileEvents::increment(ProfileEvents::RemoteFSReadMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::RemoteFSReadMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, bytes_read); ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, bytes_read);
return bytes_read; return Result{ .size = bytes_read, .offset = offset };
}); });
auto future = task->get_future(); auto future = task->get_future();

View File

@ -3,12 +3,12 @@
#include <IO/AsynchronousReader.h> #include <IO/AsynchronousReader.h>
#include <IO/SeekableReadBuffer.h> #include <IO/SeekableReadBuffer.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IDiskRemote.h> #include <Disks/IDiskRemote.h>
namespace DB namespace DB
{ {
class ReadBufferFromRemoteFSGather;
class ThreadPoolRemoteFSReader : public IAsynchronousReader class ThreadPoolRemoteFSReader : public IAsynchronousReader
{ {
@ -28,9 +28,9 @@ public:
struct ThreadPoolRemoteFSReader::RemoteFSFileDescriptor : public IFileDescriptor struct ThreadPoolRemoteFSReader::RemoteFSFileDescriptor : public IFileDescriptor
{ {
public: public:
RemoteFSFileDescriptor(std::shared_ptr<ReadBufferFromRemoteFSGather> reader_) : reader(reader_) {} explicit RemoteFSFileDescriptor(std::shared_ptr<ReadBufferFromRemoteFSGather> reader_) : reader(reader_) {}
size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0); ReadBufferFromRemoteFSGather::ReadResult readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
private: private:
std::shared_ptr<ReadBufferFromRemoteFSGather> reader; std::shared_ptr<ReadBufferFromRemoteFSGather> reader;

View File

@ -168,7 +168,7 @@ private:
inline static const String RESTORE_FILE_NAME = "restore"; inline static const String RESTORE_FILE_NAME = "restore";
/// Key has format: ../../r{revision}-{operation} /// Key has format: ../../r{revision}-{operation}
const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+).*"}; const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+)$"};
/// Object contains information about schema version. /// Object contains information about schema version.
inline static const String SCHEMA_VERSION_OBJECT = ".SCHEMA_VERSION"; inline static const String SCHEMA_VERSION_OBJECT = ".SCHEMA_VERSION";

View File

@ -56,7 +56,7 @@ public:
const auto & source_data = typeid_cast<const ColumnDecimal<DateTime64> &>(col).getData(); const auto & source_data = typeid_cast<const ColumnDecimal<DateTime64> &>(col).getData();
Int32 scale_diff = typeid_cast<const DataTypeDateTime64 &>(*src.type).getScale() - target_scale; const Int32 scale_diff = typeid_cast<const DataTypeDateTime64 &>(*src.type).getScale() - target_scale;
if (scale_diff == 0) if (scale_diff == 0)
{ {
for (size_t i = 0; i < input_rows_count; ++i) for (size_t i = 0; i < input_rows_count; ++i)

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <cstddef>
#include <type_traits> #include <type_traits>
#include <IO/WriteBufferFromVector.h> #include <IO/WriteBufferFromVector.h>
@ -34,6 +35,7 @@
#include <Columns/ColumnTuple.h> #include <Columns/ColumnTuple.h>
#include <Columns/ColumnMap.h> #include <Columns/ColumnMap.h>
#include <Columns/ColumnsCommon.h> #include <Columns/ColumnsCommon.h>
#include <Columns/ColumnStringHelpers.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Core/AccurateComparison.h> #include <Core/AccurateComparison.h>
@ -850,11 +852,15 @@ struct ConvertImpl<FromDataType, std::enable_if_t<!std::is_same_v<FromDataType,
}; };
/// Generic conversion of any type to String. /// Generic conversion of any type to String or FixedString via serialization to text.
template <typename StringColumnType>
struct ConvertImplGenericToString struct ConvertImplGenericToString
{ {
static ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type) static ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/)
{ {
static_assert(std::is_same_v<StringColumnType, ColumnString> || std::is_same_v<StringColumnType, ColumnFixedString>,
"Can be used only to serialize to ColumnString or ColumnFixedString");
ColumnUInt8::MutablePtr null_map = copyNullMap(arguments[0].column); ColumnUInt8::MutablePtr null_map = copyNullMap(arguments[0].column);
const auto & col_with_type_and_name = columnGetNested(arguments[0]); const auto & col_with_type_and_name = columnGetNested(arguments[0]);
@ -862,27 +868,25 @@ struct ConvertImplGenericToString
const IColumn & col_from = *col_with_type_and_name.column; const IColumn & col_from = *col_with_type_and_name.column;
size_t size = col_from.size(); size_t size = col_from.size();
auto col_to = result_type->createColumn();
auto col_to = ColumnString::create();
ColumnString::Chars & data_to = col_to->getChars();
ColumnString::Offsets & offsets_to = col_to->getOffsets();
data_to.resize(size * 2); /// Using coefficient 2 for initial size is arbitrary.
offsets_to.resize(size);
WriteBufferFromVector<ColumnString::Chars> write_buffer(data_to);
FormatSettings format_settings;
auto serialization = type.getDefaultSerialization();
for (size_t i = 0; i < size; ++i)
{ {
serialization->serializeText(col_from, i, write_buffer, format_settings); ColumnStringHelpers::WriteHelper write_helper(
writeChar(0, write_buffer); assert_cast<StringColumnType &>(*col_to),
offsets_to[i] = write_buffer.count(); size);
}
write_buffer.finalize(); auto & write_buffer = write_helper.getWriteBuffer();
FormatSettings format_settings;
auto serialization = type.getDefaultSerialization();
for (size_t i = 0; i < size; ++i)
{
serialization->serializeText(col_from, i, write_buffer, format_settings);
write_helper.rowWritten();
}
write_helper.finalize();
}
if (result_type->isNullable() && null_map) if (result_type->isNullable() && null_map)
return ColumnNullable::create(std::move(col_to), std::move(null_map)); return ColumnNullable::create(std::move(col_to), std::move(null_map));
@ -1006,7 +1010,8 @@ inline bool tryParseImpl<DataTypeUUID>(DataTypeUUID::FieldType & x, ReadBuffer &
else else
message_buf << " at begin of string"; message_buf << " at begin of string";
if (isNativeNumber(to_type)) // Currently there are no functions toIPv{4,6}Or{Null,Zero}
if (isNativeNumber(to_type) && !(to_type.getName() == "IPv4" || to_type.getName() == "IPv6"))
message_buf << ". Note: there are to" << to_type.getName() << "OrZero and to" << to_type.getName() << "OrNull functions, which returns zero/NULL instead of throwing exception."; message_buf << ". Note: there are to" << to_type.getName() << "OrZero and to" << to_type.getName() << "OrNull functions, which returns zero/NULL instead of throwing exception.";
throw Exception(message_buf.str(), ErrorCodes::CANNOT_PARSE_TEXT); throw Exception(message_buf.str(), ErrorCodes::CANNOT_PARSE_TEXT);
@ -1285,40 +1290,35 @@ template <typename ToDataType, typename Name>
struct ConvertImpl<std::enable_if_t<!std::is_same_v<ToDataType, DataTypeFixedString>, DataTypeFixedString>, ToDataType, Name, ConvertReturnNullOnErrorTag> struct ConvertImpl<std::enable_if_t<!std::is_same_v<ToDataType, DataTypeFixedString>, DataTypeFixedString>, ToDataType, Name, ConvertReturnNullOnErrorTag>
: ConvertThroughParsing<DataTypeFixedString, ToDataType, Name, ConvertFromStringExceptionMode::Null, ConvertFromStringParsingMode::Normal> {}; : ConvertThroughParsing<DataTypeFixedString, ToDataType, Name, ConvertFromStringExceptionMode::Null, ConvertFromStringParsingMode::Normal> {};
/// Generic conversion of any type from String. Used for complex types: Array and Tuple. /// Generic conversion of any type from String. Used for complex types: Array and Tuple or types with custom serialization.
template <typename StringColumnType>
struct ConvertImplGenericFromString struct ConvertImplGenericFromString
{ {
static ColumnPtr execute(ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type) static ColumnPtr execute(ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable *, size_t input_rows_count)
{ {
static_assert(std::is_same_v<StringColumnType, ColumnString> || std::is_same_v<StringColumnType, ColumnFixedString>,
"Can be used only to parse from ColumnString or ColumnFixedString");
const IColumn & col_from = *arguments[0].column; const IColumn & col_from = *arguments[0].column;
size_t size = col_from.size();
const IDataType & data_type_to = *result_type; const IDataType & data_type_to = *result_type;
if (const StringColumnType * col_from_string = checkAndGetColumn<StringColumnType>(&col_from))
if (const ColumnString * col_from_string = checkAndGetColumn<ColumnString>(&col_from))
{ {
auto res = data_type_to.createColumn(); auto res = data_type_to.createColumn();
IColumn & column_to = *res; IColumn & column_to = *res;
column_to.reserve(size); column_to.reserve(input_rows_count);
const ColumnString::Chars & chars = col_from_string->getChars();
const IColumn::Offsets & offsets = col_from_string->getOffsets();
size_t current_offset = 0;
FormatSettings format_settings; FormatSettings format_settings;
auto serialization = data_type_to.getDefaultSerialization(); auto serialization = data_type_to.getDefaultSerialization();
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < input_rows_count; ++i)
{ {
ReadBufferFromMemory read_buffer(&chars[current_offset], offsets[i] - current_offset - 1); const auto & val = col_from_string->getDataAt(i);
ReadBufferFromMemory read_buffer(val.data, val.size);
serialization->deserializeWholeText(column_to, read_buffer, format_settings); serialization->deserializeWholeText(column_to, read_buffer, format_settings);
if (!read_buffer.eof()) if (!read_buffer.eof())
throwExceptionForIncompletelyParsedValue(read_buffer, result_type); throwExceptionForIncompletelyParsedValue(read_buffer, result_type);
current_offset = offsets[i];
} }
return res; return res;
@ -1767,7 +1767,7 @@ private:
/// Generic conversion of any type to String. /// Generic conversion of any type to String.
if (std::is_same_v<ToDataType, DataTypeString>) if (std::is_same_v<ToDataType, DataTypeString>)
{ {
return ConvertImplGenericToString::execute(arguments, result_type); return ConvertImplGenericToString<ColumnString>::execute(arguments, result_type, input_rows_count);
} }
else else
throw Exception("Illegal type " + arguments[0].type->getName() + " of argument of function " + getName(), throw Exception("Illegal type " + arguments[0].type->getName() + " of argument of function " + getName(),
@ -2725,10 +2725,7 @@ private:
/// Conversion from String through parsing. /// Conversion from String through parsing.
if (checkAndGetDataType<DataTypeString>(from_type_untyped.get())) if (checkAndGetDataType<DataTypeString>(from_type_untyped.get()))
{ {
return [] (ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable *, size_t /*input_rows_count*/) return &ConvertImplGenericFromString<ColumnString>::execute;
{
return ConvertImplGenericFromString::execute(arguments, result_type);
};
} }
else else
{ {
@ -2745,10 +2742,7 @@ private:
/// Conversion from String through parsing. /// Conversion from String through parsing.
if (checkAndGetDataType<DataTypeString>(from_type_untyped.get())) if (checkAndGetDataType<DataTypeString>(from_type_untyped.get()))
{ {
return [] (ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable *, size_t /*input_rows_count*/) return &ConvertImplGenericFromString<ColumnString>::execute;
{
return ConvertImplGenericFromString::execute(arguments, result_type);
};
} }
const auto * from_type = checkAndGetDataType<DataTypeArray>(from_type_untyped.get()); const auto * from_type = checkAndGetDataType<DataTypeArray>(from_type_untyped.get());
@ -2816,10 +2810,7 @@ private:
/// Conversion from String through parsing. /// Conversion from String through parsing.
if (checkAndGetDataType<DataTypeString>(from_type_untyped.get())) if (checkAndGetDataType<DataTypeString>(from_type_untyped.get()))
{ {
return [] (ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable *, size_t /*input_rows_count*/) return &ConvertImplGenericFromString<ColumnString>::execute;
{
return ConvertImplGenericFromString::execute(arguments, result_type);
};
} }
const auto * from_type = checkAndGetDataType<DataTypeTuple>(from_type_untyped.get()); const auto * from_type = checkAndGetDataType<DataTypeTuple>(from_type_untyped.get());
@ -3330,6 +3321,38 @@ private:
return false; return false;
}; };
auto make_custom_serialization_wrapper = [&](const auto & types) -> bool
{
using Types = std::decay_t<decltype(types)>;
using ToDataType = typename Types::RightType;
using FromDataType = typename Types::LeftType;
if constexpr (WhichDataType(FromDataType::type_id).isStringOrFixedString())
{
if (to_type->getCustomSerialization())
{
ret = &ConvertImplGenericFromString<typename FromDataType::ColumnType>::execute;
return true;
}
}
if constexpr (WhichDataType(ToDataType::type_id).isStringOrFixedString())
{
if (from_type->getCustomSerialization())
{
ret = [](ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable *, size_t input_rows_count) -> ColumnPtr
{
return ConvertImplGenericToString<typename ToDataType::ColumnType>::execute(arguments, result_type, input_rows_count);
};
return true;
}
}
return false;
};
if (callOnTwoTypeIndexes(from_type->getTypeId(), to_type->getTypeId(), make_custom_serialization_wrapper))
return ret;
if (callOnIndexAndDataType<void>(to_type->getTypeId(), make_default_wrapper)) if (callOnIndexAndDataType<void>(to_type->getTypeId(), make_default_wrapper))
return ret; return ret;

View File

@ -749,7 +749,7 @@ private:
{ {
ColumnsWithTypeAndName cols; ColumnsWithTypeAndName cols;
cols.emplace_back(col_arr.getDataPtr(), nested_type, "tmp"); cols.emplace_back(col_arr.getDataPtr(), nested_type, "tmp");
return ConvertImplGenericToString::execute(cols, std::make_shared<DataTypeString>()); return ConvertImplGenericToString<ColumnString>::execute(cols, std::make_shared<DataTypeString>(), col_arr.size());
} }
} }

View File

@ -69,7 +69,8 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
{ {
Stopwatch watch; Stopwatch watch;
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
size = prefetch_future.get(); auto result = prefetch_future.get();
size = result.size;
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
} }
@ -90,7 +91,7 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
{ {
/// No pending request. Do synchronous read. /// No pending request. Do synchronous read.
auto size = readInto(memory.data(), memory.size()).get(); auto [size, _] = readInto(memory.data(), memory.size()).get();
file_offset_of_buffer_end += size; file_offset_of_buffer_end += size;
if (size) if (size)
@ -201,4 +202,3 @@ void AsynchronousReadBufferFromFileDescriptor::rewind()
} }
} }

View File

@ -49,10 +49,18 @@ public:
size_t ignore = 0; size_t ignore = 0;
}; };
/// Less than requested amount of data can be returned. struct Result
/// If size is zero - the file has ended. {
/// (for example, EINTR must be handled by implementation automatically) /// size
using Result = size_t; /// Less than requested amount of data can be returned.
/// If size is zero - the file has ended.
/// (for example, EINTR must be handled by implementation automatically)
size_t size = 0;
/// offset
/// Optional. Useful when implementation needs to do ignore().
size_t offset = 0;
};
/// Submit request and obtain a handle. This method don't perform any waits. /// Submit request and obtain a handle. This method don't perform any waits.
/// If this method did not throw, the caller must wait for the result with 'wait' method /// If this method did not throw, the caller must wait for the result with 'wait' method

View File

@ -239,7 +239,8 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
} }
else else
{ {
req.SetRange(fmt::format("bytes={}-", offset)); if (offset)
req.SetRange(fmt::format("bytes={}-", offset));
LOG_TEST(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, offset); LOG_TEST(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, offset);
} }

View File

@ -82,10 +82,9 @@ std::future<IAsynchronousReader::Result> SynchronousReader::submit(Request reque
watch.stop(); watch.stop();
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
return bytes_read; return Result{ .size = bytes_read, .offset = 0};
}); });
} }
} }

View File

@ -117,7 +117,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
if (!res) if (!res)
{ {
/// The file has ended. /// The file has ended.
promise.set_value(0); promise.set_value({0, 0});
watch.stop(); watch.stop();
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds());
@ -176,7 +176,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
promise.set_value(bytes_read); promise.set_value({bytes_read, 0});
return future; return future;
} }
} }
@ -219,7 +219,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
return bytes_read; return Result{ .size = bytes_read, .offset = 0 };
}); });
auto future = task->get_future(); auto future = task->get_future();

View File

@ -5,6 +5,7 @@
#include <memory> #include <memory>
#include <iostream> #include <iostream>
#include <cassert> #include <cassert>
#include <string.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/MemoryTracker.h> #include <Common/MemoryTracker.h>

View File

@ -116,82 +116,62 @@ bool checkPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_q
} }
} }
/// In case of expression/function (order by 1+2 and 2*x1, greatest(1, 2)) replace const auto * ast_literal = typeid_cast<const ASTLiteral *>(argument.get());
/// positions only if all literals are numbers, otherwise it is not positional. if (!ast_literal)
bool positional = true; return false;
/// Case when GROUP BY element is position. auto which = ast_literal->value.getType();
if (const auto * ast_literal = typeid_cast<const ASTLiteral *>(argument.get())) if (which != Field::Types::UInt64)
return false;
auto pos = ast_literal->value.get<UInt64>();
if (!pos || pos > columns.size())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Positional argument out of bounds: {} (exprected in range [1, {}]",
pos, columns.size());
const auto & column = columns[--pos];
if (typeid_cast<const ASTIdentifier *>(column.get()))
{ {
auto which = ast_literal->value.getType(); argument = column->clone();
if (which == Field::Types::UInt64) }
else if (typeid_cast<const ASTFunction *>(column.get()))
{
std::function<void(ASTPtr)> throw_if_aggregate_function = [&](ASTPtr node)
{ {
auto pos = ast_literal->value.get<UInt64>(); if (const auto * function = typeid_cast<const ASTFunction *>(node.get()))
if (pos > 0 && pos <= columns.size())
{ {
const auto & column = columns[--pos]; auto is_aggregate_function = AggregateFunctionFactory::instance().isAggregateFunctionName(function->name);
if (typeid_cast<const ASTIdentifier *>(column.get())) if (is_aggregate_function)
{ {
argument = column->clone(); throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
} "Illegal value (aggregate function) for positional argument in {}",
else if (typeid_cast<const ASTFunction *>(column.get())) ASTSelectQuery::expressionToString(expression));
{
std::function<void(ASTPtr)> throw_if_aggregate_function = [&](ASTPtr node)
{
if (const auto * function = typeid_cast<const ASTFunction *>(node.get()))
{
auto is_aggregate_function = AggregateFunctionFactory::instance().isAggregateFunctionName(function->name);
if (is_aggregate_function)
{
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal value (aggregate function) for positional argument in {}",
ASTSelectQuery::expressionToString(expression));
}
else
{
if (function->arguments)
{
for (const auto & arg : function->arguments->children)
throw_if_aggregate_function(arg);
}
}
}
};
if (expression == ASTSelectQuery::Expression::GROUP_BY)
throw_if_aggregate_function(column);
argument = column->clone();
} }
else else
{ {
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, if (function->arguments)
"Illegal value for positional argument in {}", {
ASTSelectQuery::expressionToString(expression)); for (const auto & arg : function->arguments->children)
throw_if_aggregate_function(arg);
}
} }
} }
else if (pos > columns.size() || !pos) };
{
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, if (expression == ASTSelectQuery::Expression::GROUP_BY)
"Positional argument out of bounds: {} (exprected in range [1, {}]", throw_if_aggregate_function(column);
pos, columns.size());
} argument = column->clone();
}
else
positional = false;
}
else if (const auto * ast_function = typeid_cast<const ASTFunction *>(argument.get()))
{
if (ast_function->arguments)
{
for (auto & arg : ast_function->arguments->children)
positional &= checkPositionalArguments(arg, select_query, expression);
}
} }
else else
positional = false; {
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal value for positional argument in {}",
ASTSelectQuery::expressionToString(expression));
}
return positional; return true;
} }
void replaceForPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_query, ASTSelectQuery::Expression expression) void replaceForPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_query, ASTSelectQuery::Expression expression)

View File

@ -40,7 +40,7 @@ TEST(MySQLCreateRewritten, ColumnsDataType)
{"TINYINT", "Int8"}, {"SMALLINT", "Int16"}, {"MEDIUMINT", "Int32"}, {"INT", "Int32"}, {"TINYINT", "Int8"}, {"SMALLINT", "Int16"}, {"MEDIUMINT", "Int32"}, {"INT", "Int32"},
{"INTEGER", "Int32"}, {"BIGINT", "Int64"}, {"FLOAT", "Float32"}, {"DOUBLE", "Float64"}, {"INTEGER", "Int32"}, {"BIGINT", "Int64"}, {"FLOAT", "Float32"}, {"DOUBLE", "Float64"},
{"VARCHAR(10)", "String"}, {"CHAR(10)", "String"}, {"Date", "Date"}, {"DateTime", "DateTime"}, {"VARCHAR(10)", "String"}, {"CHAR(10)", "String"}, {"Date", "Date"}, {"DateTime", "DateTime"},
{"TIMESTAMP", "DateTime"}, {"BOOLEAN", "Bool"} {"TIMESTAMP", "DateTime"}, {"BOOLEAN", "Bool"}, {"BIT", "UInt64"}
}; };
for (const auto & [test_type, mapped_type] : test_types) for (const auto & [test_type, mapped_type] : test_types)

View File

@ -402,7 +402,7 @@ bool ParserVariableArityOperatorList::parseImpl(Pos & pos, ASTPtr & node, Expect
bool ParserBetweenExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) bool ParserBetweenExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{ {
/// For the expression (subject [NOT] BETWEEN left AND right) /// For the expression (subject [NOT] BETWEEN left AND right)
/// create an AST the same as for (subject> = left AND subject <= right). /// create an AST the same as for (subject >= left AND subject <= right).
ParserKeyword s_not("NOT"); ParserKeyword s_not("NOT");
ParserKeyword s_between("BETWEEN"); ParserKeyword s_between("BETWEEN");

View File

@ -2,6 +2,7 @@
#if USE_MYSQL #if USE_MYSQL
#include <vector> #include <vector>
#include <Core/MySQL/MySQLReplication.h>
#include <Columns/ColumnNullable.h> #include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
@ -126,7 +127,7 @@ namespace
{ {
using ValueType = ExternalResultDescription::ValueType; using ValueType = ExternalResultDescription::ValueType;
void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value, size_t & read_bytes_size) void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value, size_t & read_bytes_size, enum enum_field_types mysql_type)
{ {
switch (type) switch (type)
{ {
@ -143,9 +144,24 @@ namespace
read_bytes_size += 4; read_bytes_size += 4;
break; break;
case ValueType::vtUInt64: case ValueType::vtUInt64:
assert_cast<ColumnUInt64 &>(column).insertValue(value.getUInt()); {
read_bytes_size += 8; //we don't have enum enum_field_types definition in mysqlxx/Types.h, so we use literal values directly here.
if (static_cast<int>(mysql_type) == 16)
{
size_t n = value.size();
UInt64 val = 0UL;
ReadBufferFromMemory payload(const_cast<char *>(value.data()), n);
MySQLReplication::readBigEndianStrict(payload, reinterpret_cast<char *>(&val), n);
assert_cast<ColumnUInt64 &>(column).insertValue(val);
read_bytes_size += n;
}
else
{
assert_cast<ColumnUInt64 &>(column).insertValue(value.getUInt());
read_bytes_size += 8;
}
break; break;
}
case ValueType::vtInt8: case ValueType::vtInt8:
assert_cast<ColumnInt8 &>(column).insertValue(value.getInt()); assert_cast<ColumnInt8 &>(column).insertValue(value.getInt());
read_bytes_size += 1; read_bytes_size += 1;
@ -258,12 +274,12 @@ Chunk MySQLSource::generate()
{ {
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[index]); ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[index]);
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type); const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value, read_bytes_size); insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value, read_bytes_size, row.getFieldType(position_mapping[index]));
column_nullable.getNullMapData().emplace_back(false); column_nullable.getNullMapData().emplace_back(false);
} }
else else
{ {
insertValue(*sample.type, *columns[index], description.types[index].first, value, read_bytes_size); insertValue(*sample.type, *columns[index], description.types[index].first, value, read_bytes_size, row.getFieldType(position_mapping[index]));
} }
} }
else else

View File

@ -557,6 +557,8 @@ void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response
std::lock_guard lock(conn_stats_mutex); std::lock_guard lock(conn_stats_mutex);
conn_stats.updateLatency(elapsed); conn_stats.updateLatency(elapsed);
} }
operations.erase(response->xid);
keeper_dispatcher->updateKeeperStatLatency(elapsed); keeper_dispatcher->updateKeeperStatLatency(elapsed);
last_op.set(std::make_unique<LastOp>(LastOp{ last_op.set(std::make_unique<LastOp>(LastOp{

View File

@ -93,7 +93,7 @@ private:
Poco::Timestamp established; Poco::Timestamp established;
using Operations = std::map<Coordination::XID, Poco::Timestamp>; using Operations = std::unordered_map<Coordination::XID, Poco::Timestamp>;
Operations operations; Operations operations;
LastOpMultiVersion last_op; LastOpMultiVersion last_op;

View File

@ -198,7 +198,9 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(const RangesInDataParts &
for (const auto i : collections::range(0, parts.size())) for (const auto i : collections::range(0, parts.size()))
{ {
const auto & part = parts[i]; const auto & part = parts[i];
is_part_on_remote_disk[i] = part.data_part->isStoredOnRemoteDisk(); bool part_on_remote_disk = part.data_part->isStoredOnRemoteDisk();
is_part_on_remote_disk[i] = part_on_remote_disk;
do_not_steal_tasks |= part_on_remote_disk;
/// Read marks for every data part. /// Read marks for every data part.
size_t sum_marks = 0; size_t sum_marks = 0;

View File

@ -203,6 +203,8 @@ void MergeTreeReaderCompact::readData(
{ {
const auto & [name, type] = name_and_type; const auto & [name, type] = name_and_type;
adjustUpperBound(current_task_last_mark); /// Must go before seek.
if (!isContinuousReading(from_mark, column_position)) if (!isContinuousReading(from_mark, column_position))
seekToMark(from_mark, column_position); seekToMark(from_mark, column_position);
@ -211,8 +213,6 @@ void MergeTreeReaderCompact::readData(
if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != ISerialization::Substream::ArraySizes)) if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != ISerialization::Substream::ArraySizes))
return nullptr; return nullptr;
/// For asynchronous reading from remote fs.
data_buffer->setReadUntilPosition(marks_loader.getMark(current_task_last_mark).offset_in_compressed_file);
return data_buffer; return data_buffer;
}; };
@ -275,6 +275,34 @@ void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
} }
} }
void MergeTreeReaderCompact::adjustUpperBound(size_t last_mark)
{
auto right_offset = marks_loader.getMark(last_mark).offset_in_compressed_file;
if (!right_offset)
{
/// If already reading till the end of file.
if (last_right_offset && *last_right_offset == 0)
return;
last_right_offset = 0; // Zero value means the end of file.
if (cached_buffer)
cached_buffer->setReadUntilEnd();
if (non_cached_buffer)
non_cached_buffer->setReadUntilEnd();
}
else
{
if (last_right_offset && right_offset <= last_right_offset.value())
return;
last_right_offset = right_offset;
if (cached_buffer)
cached_buffer->setReadUntilPosition(right_offset);
if (non_cached_buffer)
non_cached_buffer->setReadUntilPosition(right_offset);
}
}
bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_position) bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_position)
{ {
if (!last_read_granule) if (!last_read_granule)

View File

@ -52,6 +52,9 @@ private:
/// Should we read full column or only it's offsets /// Should we read full column or only it's offsets
std::vector<bool> read_only_offsets; std::vector<bool> read_only_offsets;
/// For asynchronous reading from remote fs. Same meaning as in MergeTreeReaderStream.
std::optional<size_t> last_right_offset;
size_t next_mark = 0; size_t next_mark = 0;
std::optional<std::pair<size_t, size_t>> last_read_granule; std::optional<std::pair<size_t, size_t>> last_read_granule;
@ -67,6 +70,9 @@ private:
MergeTreeMarksLoader & marks_loader, MergeTreeMarksLoader & marks_loader,
const ColumnPositions & column_positions, const ColumnPositions & column_positions,
const MarkRanges & mark_ranges); const MarkRanges & mark_ranges);
/// For asynchronous reading from remote fs.
void adjustUpperBound(size_t last_mark);
}; };
} }

View File

@ -1,12 +1,21 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from collections import namedtuple
import json import json
import time import time
import jwt
import jwt
import requests import requests
import boto3 import boto3
NEED_RERUN_OR_CANCELL_WORKFLOWS = {
13241696, # PR
15834118, # Docs
15522500, # MasterCI
15516108, # ReleaseCI
15797242, # BackportPR
}
# https://docs.github.com/en/rest/reference/actions#cancel-a-workflow-run # https://docs.github.com/en/rest/reference/actions#cancel-a-workflow-run
# #
API_URL = 'https://api.github.com/repos/ClickHouse/ClickHouse' API_URL = 'https://api.github.com/repos/ClickHouse/ClickHouse'
@ -70,19 +79,32 @@ def _exec_get_with_retry(url):
raise Exception("Cannot execute GET request with retries") raise Exception("Cannot execute GET request with retries")
def get_workflows_cancel_urls_for_pull_request(pull_request_event): WorkflowDescription = namedtuple('WorkflowDescription',
['run_id', 'status', 'rerun_url', 'cancel_url'])
def get_workflows_description_for_pull_request(pull_request_event):
head_branch = pull_request_event['head']['ref'] head_branch = pull_request_event['head']['ref']
print("PR", pull_request_event['number'], "has head ref", head_branch) print("PR", pull_request_event['number'], "has head ref", head_branch)
workflows = _exec_get_with_retry(API_URL + f"/actions/runs?branch={head_branch}") workflows = _exec_get_with_retry(API_URL + f"/actions/runs?branch={head_branch}")
workflows_urls_to_cancel = set([]) workflow_descriptions = []
for workflow in workflows['workflow_runs']: for workflow in workflows['workflow_runs']:
if workflow['status'] != 'completed': if workflow['workflow_id'] in NEED_RERUN_OR_CANCELL_WORKFLOWS:
print("Workflow", workflow['url'], "not finished, going to be cancelled") workflow_descriptions.append(WorkflowDescription(
workflows_urls_to_cancel.add(workflow['cancel_url']) run_id=workflow['id'],
else: status=workflow['status'],
print("Workflow", workflow['url'], "already finished, will not try to cancel") rerun_url=workflow['rerun_url'],
cancel_url=workflow['cancel_url']))
return workflows_urls_to_cancel return workflow_descriptions
def get_workflow_description(workflow_id):
workflow = _exec_get_with_retry(API_URL + f"/actions/runs/{workflow_id}")
return WorkflowDescription(
run_id=workflow['id'],
status=workflow['status'],
rerun_url=workflow['rerun_url'],
cancel_url=workflow['cancel_url'])
def _exec_post_with_retry(url, token): def _exec_post_with_retry(url, token):
headers = { headers = {
@ -99,11 +121,11 @@ def _exec_post_with_retry(url, token):
raise Exception("Cannot execute POST request with retry") raise Exception("Cannot execute POST request with retry")
def cancel_workflows(urls_to_cancel, token): def exec_workflow_url(urls_to_cancel, token):
for url in urls_to_cancel: for url in urls_to_cancel:
print("Cancelling workflow using url", url) print("Post for workflow workflow using url", url)
_exec_post_with_retry(url, token) _exec_post_with_retry(url, token)
print("Workflow cancelled") print("Workflow post finished")
def main(event): def main(event):
token = get_token_from_aws() token = get_token_from_aws()
@ -117,9 +139,39 @@ def main(event):
print("PR has labels", labels) print("PR has labels", labels)
if action == 'closed' or 'do not test' in labels: if action == 'closed' or 'do not test' in labels:
print("PR merged/closed or manually labeled 'do not test' will kill workflows") print("PR merged/closed or manually labeled 'do not test' will kill workflows")
workflows_to_cancel = get_workflows_cancel_urls_for_pull_request(pull_request) workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
print(f"Found {len(workflows_to_cancel)} workflows to cancel") urls_to_cancel = []
cancel_workflows(workflows_to_cancel, token) for workflow_description in workflow_descriptions:
if workflow_description.status != 'completed':
urls_to_cancel.append(workflow_description.cancel_url)
print(f"Found {len(urls_to_cancel)} workflows to cancel")
exec_workflow_url(urls_to_cancel, token)
elif action == 'labeled' and 'can be tested' in labels:
print("PR marked with can be tested label, rerun workflow")
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
if not workflow_descriptions:
print("Not found any workflows")
return
sorted_workflows = list(sorted(workflow_descriptions, key=lambda x: x.run_id))
most_recent_workflow = sorted_workflows[-1]
print("Latest workflow", most_recent_workflow)
if most_recent_workflow.status != 'completed':
print("Latest workflow is not completed, cancelling")
exec_workflow_url([most_recent_workflow.cancel_url], token)
print("Cancelled")
for _ in range(30):
latest_workflow_desc = get_workflow_description(most_recent_workflow.run_id)
print("Checking latest workflow", latest_workflow_desc)
if latest_workflow_desc.status in ('completed', 'cancelled'):
print("Finally latest workflow done, going to rerun")
exec_workflow_url([most_recent_workflow.rerun_url], token)
print("Rerun finished, exiting")
break
print("Still have strange status")
time.sleep(3)
else: else:
print("Nothing to do") print("Nothing to do")

View File

@ -0,0 +1,78 @@
#!/usr/bin/env python3
import os
import subprocess
import logging
from github import Github
from env_helper import IMAGES_PATH, REPO_COPY
from stopwatch import Stopwatch
from upload_result_helper import upload_results
from s3_helper import S3Helper
from get_robot_token import get_best_robot_token
from pr_info import PRInfo
from commit_status_helper import post_commit_status
from docker_pull_helper import get_image_with_version
from tee_popen import TeePopen
NAME = "Woboq Build (actions)"
def get_run_command(repo_path, output_path, image):
cmd = "docker run " + \
f"--volume={repo_path}:/repo_folder " \
f"--volume={output_path}:/test_output " \
f"-e 'DATA=https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/data' {image}"
return cmd
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
stopwatch = Stopwatch()
temp_path = os.getenv("TEMP_PATH", os.path.abspath("."))
pr_info = PRInfo()
gh = Github(get_best_robot_token())
if not os.path.exists(temp_path):
os.makedirs(temp_path)
docker_image = get_image_with_version(IMAGES_PATH, 'clickhouse/codebrowser')
s3_helper = S3Helper('https://s3.amazonaws.com')
result_path = os.path.join(temp_path, "result_path")
if not os.path.exists(result_path):
os.makedirs(result_path)
run_command = get_run_command(REPO_COPY, result_path, docker_image)
logging.info("Going to run codebrowser: %s", run_command)
run_log_path = os.path.join(temp_path, "runlog.log")
with TeePopen(run_command, run_log_path) as process:
retcode = process.wait()
if retcode == 0:
logging.info("Run successfully")
else:
logging.info("Run failed")
subprocess.check_call(f"sudo chown -R ubuntu:ubuntu {temp_path}", shell=True)
report_path = os.path.join(result_path, "html_report")
logging.info("Report path %s", report_path)
s3_path_prefix = "codebrowser"
html_urls = s3_helper.fast_parallel_upload_dir(report_path, s3_path_prefix, 'clickhouse-test-reports')
index_html = '<a href="https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/index.html">HTML report</a>'
test_results = [(index_html, "Look at the report")]
report_url = upload_results(s3_helper, pr_info.number, pr_info.sha, test_results, [], NAME)
print(f"::notice ::Report url: {report_url}")
post_commit_status(gh, pr_info.sha, NAME, "Report built", "success", report_url)

View File

@ -33,7 +33,7 @@ def get_pr_for_commit(sha, ref):
class PRInfo: class PRInfo:
def __init__(self, github_event=None, need_orgs=False, need_changed_files=False): def __init__(self, github_event=None, need_orgs=False, need_changed_files=False, labels_from_api=False):
if not github_event: if not github_event:
if GITHUB_EVENT_PATH: if GITHUB_EVENT_PATH:
with open(GITHUB_EVENT_PATH, 'r', encoding='utf-8') as event_file: with open(GITHUB_EVENT_PATH, 'r', encoding='utf-8') as event_file:
@ -61,7 +61,12 @@ class PRInfo:
self.head_ref = github_event['pull_request']['head']['ref'] self.head_ref = github_event['pull_request']['head']['ref']
self.head_name = github_event['pull_request']['head']['repo']['full_name'] self.head_name = github_event['pull_request']['head']['repo']['full_name']
self.labels = {l['name'] for l in github_event['pull_request']['labels']} if labels_from_api:
response = requests.get(f"https://api.github.com/repos/{GITHUB_REPOSITORY}/issues/{self.number}/labels")
self.labels = {l['name'] for l in response.json()}
else:
self.labels = {l['name'] for l in github_event['pull_request']['labels']}
self.user_login = github_event['pull_request']['user']['login'] self.user_login = github_event['pull_request']['user']['login']
self.user_orgs = set([]) self.user_orgs = set([])
if need_orgs: if need_orgs:
@ -90,7 +95,12 @@ class PRInfo:
f"https://api.github.com/repos/{GITHUB_REPOSITORY}/compare/{github_event['before']}...{self.sha}" f"https://api.github.com/repos/{GITHUB_REPOSITORY}/compare/{github_event['before']}...{self.sha}"
else: else:
self.number = pull_request['number'] self.number = pull_request['number']
self.labels = {l['name'] for l in pull_request['labels']} if labels_from_api:
response = requests.get(f"https://api.github.com/repos/{GITHUB_REPOSITORY}/issues/{self.number}/labels")
self.labels = {l['name'] for l in response.json()}
else:
self.labels = {l['name'] for l in pull_request['labels']}
self.base_ref = pull_request['base']['ref'] self.base_ref = pull_request['base']['ref']
self.base_name = pull_request['base']['repo']['full_name'] self.base_name = pull_request['base']['repo']['full_name']
self.head_ref = pull_request['head']['ref'] self.head_ref = pull_request['head']['ref']

View File

@ -90,6 +90,7 @@ def pr_is_by_trusted_user(pr_user_login, pr_user_orgs):
# can be skipped entirely. # can be skipped entirely.
def should_run_checks_for_pr(pr_info): def should_run_checks_for_pr(pr_info):
# Consider the labels and whether the user is trusted. # Consider the labels and whether the user is trusted.
print("Got labels", pr_info.labels)
force_labels = set(['force tests']).intersection(pr_info.labels) force_labels = set(['force tests']).intersection(pr_info.labels)
if force_labels: if force_labels:
return True, "Labeled '{}'".format(', '.join(force_labels)) return True, "Labeled '{}'".format(', '.join(force_labels))
@ -109,7 +110,7 @@ def should_run_checks_for_pr(pr_info):
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
pr_info = PRInfo(need_orgs=True) pr_info = PRInfo(need_orgs=True, labels_from_api=True)
can_run, description = should_run_checks_for_pr(pr_info) can_run, description = should_run_checks_for_pr(pr_info)
gh = Github(get_best_robot_token()) gh = Github(get_best_robot_token())
commit = get_commit(gh, pr_info.sha) commit = get_commit(gh, pr_info.sha)

View File

@ -4,6 +4,7 @@ import logging
import os import os
import re import re
import shutil import shutil
import time
from multiprocessing.dummy import Pool from multiprocessing.dummy import Pool
import boto3 import boto3
@ -83,6 +84,58 @@ class S3Helper:
else: else:
return S3Helper.copy_file_to_local(S3_BUILDS_BUCKET, file_path, s3_path) return S3Helper.copy_file_to_local(S3_BUILDS_BUCKET, file_path, s3_path)
def fast_parallel_upload_dir(self, dir_path, s3_dir_path, bucket_name):
all_files = []
for root, _, files in os.walk(dir_path):
for file in files:
all_files.append(os.path.join(root, file))
logging.info("Files found %s", len(all_files))
counter = 0
t = time.time()
sum_time = 0
def upload_task(file_path):
nonlocal counter
nonlocal t
nonlocal sum_time
try:
s3_path = file_path.replace(dir_path, s3_dir_path)
metadata = {}
if s3_path.endswith("html"):
metadata['ContentType'] = "text/html; charset=utf-8"
elif s3_path.endswith("css"):
metadata['ContentType'] = "text/css; charset=utf-8"
elif s3_path.endswith("js"):
metadata['ContentType'] = "text/javascript; charset=utf-8"
# Retry
for i in range(5):
try:
self.client.upload_file(file_path, bucket_name, s3_path, ExtraArgs=metadata)
break
except Exception as ex:
if i == 4:
raise ex
time.sleep(0.1 * i)
counter += 1
if counter % 1000 == 0:
sum_time += int(time.time() - t)
print("Uploaded", counter, "-", int(time.time() - t), "s", "sum time", sum_time, "s")
t = time.time()
except Exception as ex:
logging.critical("Failed to upload file, expcetion %s", ex)
return "https://s3.amazonaws.com/{bucket}/{path}".format(bucket=bucket_name, path=s3_path)
p = Pool(256)
logging.basicConfig(level=logging.CRITICAL)
result = sorted(_flatten_list(p.map(upload_task, all_files)))
logging.basicConfig(level=logging.INFO)
return result
def _upload_folder_to_s3(self, folder_path, s3_folder_path, bucket_name, keep_dirs_in_s3_path, upload_symlinks): def _upload_folder_to_s3(self, folder_path, s3_folder_path, bucket_name, keep_dirs_in_s3_path, upload_symlinks):
logging.info("Upload folder '%s' to bucket=%s of s3 folder '%s'", folder_path, bucket_name, s3_folder_path) logging.info("Upload folder '%s' to bucket=%s of s3 folder '%s'", folder_path, bucket_name, s3_folder_path)
if not os.path.exists(folder_path): if not os.path.exists(folder_path):

View File

@ -23,7 +23,7 @@ SUSPICIOUS_PATTERNS = [
] ]
MAX_RETRY = 5 MAX_RETRY = 5
MAX_WORKFLOW_RERUN = 5 MAX_WORKFLOW_RERUN = 7
WorkflowDescription = namedtuple('WorkflowDescription', WorkflowDescription = namedtuple('WorkflowDescription',
['name', 'action', 'run_id', 'event', 'workflow_id', 'conclusion', 'status', 'api_url', ['name', 'action', 'run_id', 'event', 'workflow_id', 'conclusion', 'status', 'api_url',
@ -44,6 +44,7 @@ NEED_RERUN_WORKFLOWS = {
15834118, # Docs 15834118, # Docs
15522500, # MasterCI 15522500, # MasterCI
15516108, # ReleaseCI 15516108, # ReleaseCI
15797242, # BackportPR
} }
# Individual trusted contirbutors who are not in any trusted organization. # Individual trusted contirbutors who are not in any trusted organization.

View File

@ -1527,7 +1527,7 @@ class ClickHouseCluster:
if os.path.exists(self.mysql_dir): if os.path.exists(self.mysql_dir):
shutil.rmtree(self.mysql_dir) shutil.rmtree(self.mysql_dir)
os.makedirs(self.mysql_logs_dir) os.makedirs(self.mysql_logs_dir)
os.chmod(self.mysql_logs_dir, stat.S_IRWXO) os.chmod(self.mysql_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
subprocess_check_call(self.base_mysql_cmd + common_opts) subprocess_check_call(self.base_mysql_cmd + common_opts)
self.up_called = True self.up_called = True
self.wait_mysql_to_start() self.wait_mysql_to_start()
@ -1537,7 +1537,7 @@ class ClickHouseCluster:
if os.path.exists(self.mysql8_dir): if os.path.exists(self.mysql8_dir):
shutil.rmtree(self.mysql8_dir) shutil.rmtree(self.mysql8_dir)
os.makedirs(self.mysql8_logs_dir) os.makedirs(self.mysql8_logs_dir)
os.chmod(self.mysql8_logs_dir, stat.S_IRWXO) os.chmod(self.mysql8_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
subprocess_check_call(self.base_mysql8_cmd + common_opts) subprocess_check_call(self.base_mysql8_cmd + common_opts)
self.wait_mysql8_to_start() self.wait_mysql8_to_start()
@ -1546,7 +1546,7 @@ class ClickHouseCluster:
if os.path.exists(self.mysql_cluster_dir): if os.path.exists(self.mysql_cluster_dir):
shutil.rmtree(self.mysql_cluster_dir) shutil.rmtree(self.mysql_cluster_dir)
os.makedirs(self.mysql_cluster_logs_dir) os.makedirs(self.mysql_cluster_logs_dir)
os.chmod(self.mysql_cluster_logs_dir, stat.S_IRWXO) os.chmod(self.mysql_cluster_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
subprocess_check_call(self.base_mysql_cluster_cmd + common_opts) subprocess_check_call(self.base_mysql_cluster_cmd + common_opts)
self.up_called = True self.up_called = True
@ -1557,7 +1557,7 @@ class ClickHouseCluster:
if os.path.exists(self.postgres_dir): if os.path.exists(self.postgres_dir):
shutil.rmtree(self.postgres_dir) shutil.rmtree(self.postgres_dir)
os.makedirs(self.postgres_logs_dir) os.makedirs(self.postgres_logs_dir)
os.chmod(self.postgres_logs_dir, stat.S_IRWXO) os.chmod(self.postgres_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
subprocess_check_call(self.base_postgres_cmd + common_opts) subprocess_check_call(self.base_postgres_cmd + common_opts)
self.up_called = True self.up_called = True
@ -1566,11 +1566,11 @@ class ClickHouseCluster:
if self.with_postgres_cluster and self.base_postgres_cluster_cmd: if self.with_postgres_cluster and self.base_postgres_cluster_cmd:
print('Setup Postgres') print('Setup Postgres')
os.makedirs(self.postgres2_logs_dir) os.makedirs(self.postgres2_logs_dir)
os.chmod(self.postgres2_logs_dir, stat.S_IRWXO) os.chmod(self.postgres2_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
os.makedirs(self.postgres3_logs_dir) os.makedirs(self.postgres3_logs_dir)
os.chmod(self.postgres3_logs_dir, stat.S_IRWXO) os.chmod(self.postgres3_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
os.makedirs(self.postgres4_logs_dir) os.makedirs(self.postgres4_logs_dir)
os.chmod(self.postgres4_logs_dir, stat.S_IRWXO) os.chmod(self.postgres4_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
subprocess_check_call(self.base_postgres_cluster_cmd + common_opts) subprocess_check_call(self.base_postgres_cluster_cmd + common_opts)
self.up_called = True self.up_called = True
self.wait_postgres_cluster_to_start() self.wait_postgres_cluster_to_start()
@ -1591,7 +1591,7 @@ class ClickHouseCluster:
if self.with_rabbitmq and self.base_rabbitmq_cmd: if self.with_rabbitmq and self.base_rabbitmq_cmd:
logging.debug('Setup RabbitMQ') logging.debug('Setup RabbitMQ')
os.makedirs(self.rabbitmq_logs_dir) os.makedirs(self.rabbitmq_logs_dir)
os.chmod(self.rabbitmq_logs_dir, stat.S_IRWXO) os.chmod(self.rabbitmq_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
for i in range(5): for i in range(5):
subprocess_check_call(self.base_rabbitmq_cmd + common_opts + ['--renew-anon-volumes']) subprocess_check_call(self.base_rabbitmq_cmd + common_opts + ['--renew-anon-volumes'])
@ -1604,7 +1604,7 @@ class ClickHouseCluster:
if self.with_hdfs and self.base_hdfs_cmd: if self.with_hdfs and self.base_hdfs_cmd:
logging.debug('Setup HDFS') logging.debug('Setup HDFS')
os.makedirs(self.hdfs_logs_dir) os.makedirs(self.hdfs_logs_dir)
os.chmod(self.hdfs_logs_dir, stat.S_IRWXO) os.chmod(self.hdfs_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
subprocess_check_call(self.base_hdfs_cmd + common_opts) subprocess_check_call(self.base_hdfs_cmd + common_opts)
self.up_called = True self.up_called = True
self.make_hdfs_api() self.make_hdfs_api()
@ -1613,7 +1613,7 @@ class ClickHouseCluster:
if self.with_kerberized_hdfs and self.base_kerberized_hdfs_cmd: if self.with_kerberized_hdfs and self.base_kerberized_hdfs_cmd:
logging.debug('Setup kerberized HDFS') logging.debug('Setup kerberized HDFS')
os.makedirs(self.hdfs_kerberized_logs_dir) os.makedirs(self.hdfs_kerberized_logs_dir)
os.chmod(self.hdfs_kerberized_logs_dir, stat.S_IRWXO) os.chmod(self.hdfs_kerberized_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
run_and_check(self.base_kerberized_hdfs_cmd + common_opts) run_and_check(self.base_kerberized_hdfs_cmd + common_opts)
self.up_called = True self.up_called = True
self.make_hdfs_api(kerberized=True) self.make_hdfs_api(kerberized=True)
@ -1669,7 +1669,7 @@ class ClickHouseCluster:
if self.with_jdbc_bridge and self.base_jdbc_bridge_cmd: if self.with_jdbc_bridge and self.base_jdbc_bridge_cmd:
os.makedirs(self.jdbc_driver_logs_dir) os.makedirs(self.jdbc_driver_logs_dir)
os.chmod(self.jdbc_driver_logs_dir, stat.S_IRWXO) os.chmod(self.jdbc_driver_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
subprocess_check_call(self.base_jdbc_bridge_cmd + ['up', '-d']) subprocess_check_call(self.base_jdbc_bridge_cmd + ['up', '-d'])
self.up_called = True self.up_called = True

View File

@ -1,7 +1,7 @@
[pytest] [pytest]
python_files = test*.py python_files = test*.py
norecursedirs = _instances* norecursedirs = _instances*
timeout = 1800 timeout = 900
junit_duration_report = call junit_duration_report = call
junit_suite_name = integration junit_suite_name = integration
log_level = DEBUG log_level = DEBUG

View File

@ -624,7 +624,7 @@ def err_sync_user_privs_with_materialized_mysql_database(clickhouse_node, mysql_
service_name)) service_name))
assert "priv_err_db" in clickhouse_node.query("SHOW DATABASES") assert "priv_err_db" in clickhouse_node.query("SHOW DATABASES")
assert "test_table_1" not in clickhouse_node.query("SHOW TABLES FROM priv_err_db") assert "test_table_1" not in clickhouse_node.query("SHOW TABLES FROM priv_err_db")
clickhouse_node.query("DETACH DATABASE priv_err_db") clickhouse_node.query_with_retry("DETACH DATABASE priv_err_db")
mysql_node.query("REVOKE SELECT ON priv_err_db.* FROM 'test'@'%'") mysql_node.query("REVOKE SELECT ON priv_err_db.* FROM 'test'@'%'")
time.sleep(3) time.sleep(3)
@ -743,7 +743,7 @@ def mysql_kill_sync_thread_restore_test(clickhouse_node, mysql_node, service_nam
time.sleep(sleep_time) time.sleep(sleep_time)
clickhouse_node.query("SELECT * FROM test_database.test_table") clickhouse_node.query("SELECT * FROM test_database.test_table")
clickhouse_node.query("DETACH DATABASE test_database") clickhouse_node.query_with_retry("DETACH DATABASE test_database")
clickhouse_node.query("ATTACH DATABASE test_database") clickhouse_node.query("ATTACH DATABASE test_database")
check_query(clickhouse_node, "SELECT * FROM test_database.test_table ORDER BY id FORMAT TSV", '1\n2\n') check_query(clickhouse_node, "SELECT * FROM test_database.test_table ORDER BY id FORMAT TSV", '1\n2\n')
@ -784,7 +784,7 @@ def mysql_killed_while_insert(clickhouse_node, mysql_node, service_name):
mysql_node.alloc_connection() mysql_node.alloc_connection()
clickhouse_node.query("DETACH DATABASE kill_mysql_while_insert") clickhouse_node.query_with_retry("DETACH DATABASE kill_mysql_while_insert")
clickhouse_node.query("ATTACH DATABASE kill_mysql_while_insert") clickhouse_node.query("ATTACH DATABASE kill_mysql_while_insert")
result = mysql_node.query_and_get_data("SELECT COUNT(1) FROM kill_mysql_while_insert.test") result = mysql_node.query_and_get_data("SELECT COUNT(1) FROM kill_mysql_while_insert.test")
@ -1072,3 +1072,68 @@ def table_overrides(clickhouse_node, mysql_node, service_name):
check_query(clickhouse_node, "SELECT type FROM system.columns WHERE database = 'table_overrides' AND table = 't1' AND name = 'sensor_id'", "UInt64\n") check_query(clickhouse_node, "SELECT type FROM system.columns WHERE database = 'table_overrides' AND table = 't1' AND name = 'sensor_id'", "UInt64\n")
clickhouse_node.query("DROP DATABASE IF EXISTS table_overrides") clickhouse_node.query("DROP DATABASE IF EXISTS table_overrides")
mysql_node.query("DROP DATABASE IF EXISTS table_overrides") mysql_node.query("DROP DATABASE IF EXISTS table_overrides")
def materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database_datatype")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database_datatype")
mysql_node.query("CREATE DATABASE test_database_datatype DEFAULT CHARACTER SET 'utf8'")
mysql_node.query("""
CREATE TABLE test_database_datatype.t1 (
`v1` int(10) unsigned AUTO_INCREMENT,
`v2` TINYINT,
`v3` SMALLINT,
`v4` BIGINT,
`v5` int,
`v6` TINYINT unsigned,
`v7` SMALLINT unsigned,
`v8` BIGINT unsigned,
`v9` FLOAT,
`v10` FLOAT unsigned,
`v11` DOUBLE,
`v12` DOUBLE unsigned,
`v13` DECIMAL(5,4),
`v14` date,
`v15` TEXT,
`v16` varchar(100) ,
`v17` BLOB,
`v18` datetime DEFAULT CURRENT_TIMESTAMP,
`v19` datetime(6) DEFAULT CURRENT_TIMESTAMP(6),
`v20` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
`v21` TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6),
/* todo support */
# `v22` YEAR,
# `v23` TIME,
# `v24` TIME(3),
# `v25` GEOMETRY,
`v26` bit(4),
# `v27` JSON DEFAULT NULL,
# `v28` set('a', 'c', 'f', 'd', 'e', 'b'),
`v29` mediumint(4) unsigned NOT NULL DEFAULT '0',
`v30` varbinary(255) DEFAULT NULL COMMENT 'varbinary support',
`v31` binary(200) DEFAULT NULL,
`v32` ENUM('RED','GREEN','BLUE'),
PRIMARY KEY (`v1`)
) ENGINE=InnoDB;
""")
mysql_node.query("""
INSERT INTO test_database_datatype.t1 (v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v31, v32) values
(1, 11, 9223372036854775807, -1, 1, 11, 18446744073709551615, -1.1, 1.1, -1.111, 1.111, 1.1111, '2021-10-06', 'text', 'varchar', 'BLOB', '2021-10-06 18:32:57', '2021-10-06 18:32:57.482786', '2021-10-06 18:32:57', '2021-10-06 18:32:57.482786', b'1010', 11, 'varbinary', 'binary', 'RED');
""")
clickhouse_node.query(
"CREATE DATABASE test_database_datatype ENGINE = MaterializeMySQL('{}:3306', 'test_database_datatype', 'root', 'clickhouse')".format(
service_name))
check_query(clickhouse_node, "SELECT name FROM system.tables WHERE database = 'test_database_datatype'", "t1\n")
# full synchronization check
check_query(clickhouse_node, "SELECT v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v32 FROM test_database_datatype.t1 FORMAT TSV",
"1\t1\t11\t9223372036854775807\t-1\t1\t11\t18446744073709551615\t-1.1\t1.1\t-1.111\t1.111\t1.1111\t2021-10-06\ttext\tvarchar\tBLOB\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t10\t11\tvarbinary\tRED\n")
mysql_node.query("""
INSERT INTO test_database_datatype.t1 (v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v31, v32) values
(2, 22, 9223372036854775807, -2, 2, 22, 18446744073709551615, -2.2, 2.2, -2.22, 2.222, 2.2222, '2021-10-07', 'text', 'varchar', 'BLOB', '2021-10-07 18:32:57', '2021-10-07 18:32:57.482786', '2021-10-07 18:32:57', '2021-10-07 18:32:57.482786', b'1011', 22, 'varbinary', 'binary', 'GREEN' );
""")
# increment synchronization check
check_query(clickhouse_node, "SELECT v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v32 FROM test_database_datatype.t1 ORDER BY v1 FORMAT TSV",
"1\t1\t11\t9223372036854775807\t-1\t1\t11\t18446744073709551615\t-1.1\t1.1\t-1.111\t1.111\t1.1111\t2021-10-06\ttext\tvarchar\tBLOB\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t10\t11\tvarbinary\tRED\n" +
"2\t2\t22\t9223372036854775807\t-2\t2\t22\t18446744073709551615\t-2.2\t2.2\t-2.22\t2.222\t2.2222\t2021-10-07\ttext\tvarchar\tBLOB\t2021-10-07 18:32:57\t2021-10-07 18:32:57.482786\t2021-10-07 18:32:57\t2021-10-07 18:32:57.482786\t11\t22\tvarbinary\tGREEN\n")

View File

@ -253,3 +253,7 @@ def test_table_table(started_cluster, started_mysql_8_0, started_mysql_5_7, clic
def test_table_overrides(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node): def test_table_overrides(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.table_overrides(clickhouse_node, started_mysql_5_7, "mysql57") materialize_with_ddl.table_overrides(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.table_overrides(clickhouse_node, started_mysql_8_0, "mysql80") materialize_with_ddl.table_overrides(clickhouse_node, started_mysql_8_0, "mysql80")
def test_materialized_database_support_all_kinds_of_mysql_datatype(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, started_mysql_5_7, "mysql57")

View File

@ -456,3 +456,16 @@ def test_s3_disk_reads_on_unstable_connection(cluster, node_name):
for i in range(30): for i in range(30):
print(f"Read sequence {i}") print(f"Read sequence {i}")
assert node.query("SELECT sum(id) FROM s3_test").splitlines() == ["40499995500000"] assert node.query("SELECT sum(id) FROM s3_test").splitlines() == ["40499995500000"]
@pytest.mark.parametrize("node_name", ["node"])
def test_lazy_seek_optimization_for_async_read(cluster, node_name):
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
node.query("CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3';")
node.query("INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 10000000")
node.query("SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value LIMIT 10")
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
minio = cluster.minio_client
for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')):
minio.remove_object(cluster.minio_bucket, obj.object_name)

View File

@ -37,7 +37,6 @@ def fail_request(cluster, request):
["curl", "-s", "http://resolver:8080/fail_request/{}".format(request)]) ["curl", "-s", "http://resolver:8080/fail_request/{}".format(request)])
assert response == 'OK', 'Expected "OK", but got "{}"'.format(response) assert response == 'OK', 'Expected "OK", but got "{}"'.format(response)
def throttle_request(cluster, request): def throttle_request(cluster, request):
response = cluster.exec_in_container(cluster.get_container_id('resolver'), response = cluster.exec_in_container(cluster.get_container_id('resolver'),
["curl", "-s", "http://resolver:8080/throttle_request/{}".format(request)]) ["curl", "-s", "http://resolver:8080/throttle_request/{}".format(request)])

View File

@ -7,6 +7,7 @@ import time
import pytest import pytest
from helpers.cluster import ClickHouseCluster, get_instances_dir from helpers.cluster import ClickHouseCluster, get_instances_dir
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
NOT_RESTORABLE_CONFIG_PATH = os.path.join(SCRIPT_DIR, './{}/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml'.format(get_instances_dir())) NOT_RESTORABLE_CONFIG_PATH = os.path.join(SCRIPT_DIR, './{}/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml'.format(get_instances_dir()))
COMMON_CONFIGS = ["configs/config.d/bg_processing_pool_conf.xml", "configs/config.d/clusters.xml"] COMMON_CONFIGS = ["configs/config.d/bg_processing_pool_conf.xml", "configs/config.d/clusters.xml"]

View File

@ -36,7 +36,6 @@ def get_query_stat(instance, hint):
result[ev[0]] = int(ev[1]) result[ev[0]] = int(ev[1])
return result return result
@pytest.mark.parametrize("min_rows_for_wide_part,read_requests", [(0, 2), (8192, 1)]) @pytest.mark.parametrize("min_rows_for_wide_part,read_requests", [(0, 2), (8192, 1)])
def test_write_is_cached(cluster, min_rows_for_wide_part, read_requests): def test_write_is_cached(cluster, min_rows_for_wide_part, read_requests):
node = cluster.instances["node"] node = cluster.instances["node"]

View File

@ -65,7 +65,6 @@ def create_table(cluster, additional_settings=None):
list(cluster.instances.values())[0].query(create_table_statement) list(cluster.instances.values())[0].query(create_table_statement)
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def drop_table(cluster): def drop_table(cluster):
yield yield

View File

@ -67,8 +67,8 @@ def rabbitmq_cluster():
def rabbitmq_setup_teardown(): def rabbitmq_setup_teardown():
print("RabbitMQ is available - running test") print("RabbitMQ is available - running test")
yield # run test yield # run test
for table_name in ['view', 'consumer', 'rabbitmq']: instance.query('DROP DATABASE test NO DELAY')
instance.query(f'DROP TABLE IF EXISTS test.{table_name}') instance.query('CREATE DATABASE test')
# Tests # Tests

View File

@ -11,6 +11,7 @@ import helpers.client
import pytest import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, get_instances_dir from helpers.cluster import ClickHouseCluster, ClickHouseInstance, get_instances_dir
from helpers.network import PartitionManager from helpers.network import PartitionManager
from helpers.test_tools import exec_query_with_retry
MINIO_INTERNAL_PORT = 9001 MINIO_INTERNAL_PORT = 9001
@ -809,7 +810,7 @@ def test_seekable_formats(started_cluster):
assert(int(result) == 5000000) assert(int(result) == 5000000)
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')" table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
instance.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)") exec_query_with_retry(instance, f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)")
result = instance.query(f"SELECT count() FROM {table_function}") result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000) assert(int(result) == 5000000)
@ -832,7 +833,7 @@ def test_seekable_formats_url(started_cluster):
assert(int(result) == 5000000) assert(int(result) == 5000000)
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')" table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
instance.query(f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)") exec_query_with_retry(instance, f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)")
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_orc', 'ORC', 'a Int32, b String')" table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_orc', 'ORC', 'a Int32, b String')"
result = instance.query(f"SELECT count() FROM {table_function}") result = instance.query(f"SELECT count() FROM {table_function}")
@ -842,3 +843,18 @@ def test_seekable_formats_url(started_cluster):
result = instance.query(f"SELECT formatReadableSize(memory_usage) FROM system.query_log WHERE startsWith(query, 'SELECT count() FROM url') AND memory_usage > 0 ORDER BY event_time desc") result = instance.query(f"SELECT formatReadableSize(memory_usage) FROM system.query_log WHERE startsWith(query, 'SELECT count() FROM url') AND memory_usage > 0 ORDER BY event_time desc")
print(result[:3]) print(result[:3])
assert(int(result[:3]) < 200) assert(int(result[:3]) < 200)
def test_empty_file(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
name = "empty"
url = f'http://{started_cluster.minio_ip}:{MINIO_INTERNAL_PORT}/{bucket}/{name}'
minio = started_cluster.minio_client
minio.put_object(bucket, name, io.BytesIO(b""), 0)
table_function = f"s3('{url}', 'CSV', 'id Int32')"
result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 0)

View File

@ -46,22 +46,6 @@ select x1, x2, x3 from test order by 3 limit 1 by 1;
100 100 1 100 100 1
10 1 10 10 1 10
1 10 100 1 10 100
explain syntax select x3, x2, x1 from test order by 1 + 1;
SELECT
x3,
x2,
x1
FROM test
ORDER BY x3 + x3 ASC
explain syntax select x3, x2, x1 from test order by (1 + 1) * 3;
SELECT
x3,
x2,
x1
FROM test
ORDER BY (x3 + x3) * x1 ASC
select x2, x1 from test group by x2 + x1; -- { serverError 215 }
select x2, x1 from test group by 1 + 2; -- { serverError 215 }
explain syntax select x3, x2, x1 from test order by 1; explain syntax select x3, x2, x1 from test order by 1;
SELECT SELECT
x3, x3,
@ -110,27 +94,6 @@ GROUP BY
x2 x2
select max(x1), x2 from test group by 1, 2; -- { serverError 43 } select max(x1), x2 from test group by 1, 2; -- { serverError 43 }
select 1 + max(x1), x2 from test group by 1, 2; -- { serverError 43 } select 1 + max(x1), x2 from test group by 1, 2; -- { serverError 43 }
select x1 + x2, x3 from test group by x1 + x2, x3;
11 100
200 1
11 200
11 10
select x3, x2, x1 from test order by x3 * 2, x2, x1; -- check x3 * 2 does not become x3 * x2
1 100 100
1 100 100
10 1 10
100 10 1
200 1 10
200 10 1
explain syntax select x1, x3 from test group by 1 + 2, 1, 2;
SELECT
x1,
x3
FROM test
GROUP BY
x1 + x3,
x1,
x3
explain syntax select x1 + x3, x3 from test group by 1, 2; explain syntax select x1 + x3, x3 from test group by 1, 2;
SELECT SELECT
x1 + x3, x1 + x3,
@ -152,3 +115,5 @@ SELECT 1 + 1 AS a
GROUP BY a GROUP BY a
select substr('aaaaaaaaaaaaaa', 8) as a group by a; select substr('aaaaaaaaaaaaaa', 8) as a group by a;
aaaaaaa aaaaaaa
select substr('aaaaaaaaaaaaaa', 8) as a group by substr('aaaaaaaaaaaaaa', 8);
aaaaaaa

View File

@ -22,12 +22,6 @@ select x1, x2, x3 from test order by 3 limit 1 by 3;
select x1, x2, x3 from test order by x3 limit 1 by x1; select x1, x2, x3 from test order by x3 limit 1 by x1;
select x1, x2, x3 from test order by 3 limit 1 by 1; select x1, x2, x3 from test order by 3 limit 1 by 1;
explain syntax select x3, x2, x1 from test order by 1 + 1;
explain syntax select x3, x2, x1 from test order by (1 + 1) * 3;
select x2, x1 from test group by x2 + x1; -- { serverError 215 }
select x2, x1 from test group by 1 + 2; -- { serverError 215 }
explain syntax select x3, x2, x1 from test order by 1; explain syntax select x3, x2, x1 from test order by 1;
explain syntax select x3 + 1, x2, x1 from test order by 1; explain syntax select x3 + 1, x2, x1 from test order by 1;
explain syntax select x3, x3 - x2, x2, x1 from test order by 2; explain syntax select x3, x3 - x2, x2, x1 from test order by 2;
@ -37,11 +31,7 @@ explain syntax select 1 + greatest(x1, 1), x2 from test group by 1, 2;
select max(x1), x2 from test group by 1, 2; -- { serverError 43 } select max(x1), x2 from test group by 1, 2; -- { serverError 43 }
select 1 + max(x1), x2 from test group by 1, 2; -- { serverError 43 } select 1 + max(x1), x2 from test group by 1, 2; -- { serverError 43 }
select x1 + x2, x3 from test group by x1 + x2, x3;
select x3, x2, x1 from test order by x3 * 2, x2, x1; -- check x3 * 2 does not become x3 * x2
explain syntax select x1, x3 from test group by 1 + 2, 1, 2;
explain syntax select x1 + x3, x3 from test group by 1, 2; explain syntax select x1 + x3, x3 from test group by 1, 2;
create table test2(x1 Int, x2 Int, x3 Int) engine=Memory; create table test2(x1 Int, x2 Int, x3 Int) engine=Memory;
@ -52,3 +42,5 @@ select a, b, c, d, e, f from (select 44 a, 88 b, 13 c, 14 d, 15 e, 16 f) t grou
explain syntax select plus(1, 1) as a group by a; explain syntax select plus(1, 1) as a group by a;
select substr('aaaaaaaaaaaaaa', 8) as a group by a; select substr('aaaaaaaaaaaaaa', 8) as a group by a;
select substr('aaaaaaaaaaaaaa', 8) as a group by substr('aaaaaaaaaaaaaa', 8);

View File

@ -0,0 +1,6 @@
127.0.0.1 IPv4
127.0.0.1 String
2001:db8:0:85a3::ac1f:8001 IPv6
2001:db8:0:85a3::ac1f:8001 String
0.0.0.0 IPv4
:: IPv6

View File

@ -0,0 +1,13 @@
SELECT CAST('127.0.0.1' as IPv4) as v, toTypeName(v);
SELECT CAST(toIPv4('127.0.0.1') as String) as v, toTypeName(v);
SELECT CAST('2001:0db8:0000:85a3:0000:0000:ac1f:8001' as IPv6) as v, toTypeName(v);
SELECT CAST(toIPv6('2001:0db8:0000:85a3:0000:0000:ac1f:8001') as String) as v, toTypeName(v);
SELECT toIPv4('hello') as v, toTypeName(v);
SELECT toIPv6('hello') as v, toTypeName(v);
SELECT CAST('hello' as IPv4) as v, toTypeName(v); -- { serverError CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING }
SELECT CAST('hello' as IPv6) as v, toTypeName(v); -- { serverError CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING }
SELECT CAST('1.1.1.1' as IPv6) as v, toTypeName(v); -- { serverError CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING }

View File

@ -302,11 +302,44 @@ def insertLowCardinalityRowWithIncorrectDictType():
print(readException(s)) print(readException(s))
s.close() s.close()
def insertLowCardinalityRowWithIncorrectAdditionalKeys():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(30)
s.connect((CLICKHOUSE_HOST, CLICKHOUSE_PORT))
sendHello(s)
receiveHello(s)
sendQuery(s, 'insert into {}.tab format TSV'.format(CLICKHOUSE_DATABASE))
# external tables
sendEmptyBlock(s)
readHeader(s)
# Data
ba = bytearray()
writeVarUInt(2, ba) # Data
writeStringBinary('', ba)
serializeBlockInfo(ba)
writeVarUInt(1, ba) # rows
writeVarUInt(1, ba) # columns
writeStringBinary('x', ba)
writeStringBinary('LowCardinality(String)', ba)
ba.extend([1] + [0] * 7) # SharedDictionariesWithAdditionalKeys
ba.extend([3, 0] + [0] * 6) # indexes type: UInt64 [3], with NO additional keys [0]
ba.extend([1] + [0] * 7) # num_keys in dict
writeStringBinary('hello', ba) # key
ba.extend([1] + [0] * 7) # num_indexes
ba.extend([0] * 8) # UInt64 index (0 for 'hello')
s.sendall(ba)
assertPacket(readVarUInt(s), 2)
print(readException(s))
s.close()
def main(): def main():
insertValidLowCardinalityRow() insertValidLowCardinalityRow()
insertLowCardinalityRowWithIndexOverflow() insertLowCardinalityRowWithIndexOverflow()
insertLowCardinalityRowWithIncorrectDictType() insertLowCardinalityRowWithIncorrectDictType()
insertLowCardinalityRowWithIncorrectAdditionalKeys()
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -6,3 +6,6 @@ code 117: Index for LowCardinality is out of range. Dictionary size is 1, but f
Rows 0 Columns 1 Rows 0 Columns 1
Column x type LowCardinality(String) Column x type LowCardinality(String)
code 117: LowCardinality indexes serialization type for Native format cannot use global dictionary code 117: LowCardinality indexes serialization type for Native format cannot use global dictionary
Rows 0 Columns 1
Column x type LowCardinality(String)
code 117: No additional keys found.