Merge branch 'sentry' of github.com:blinkov/ClickHouse; branch 'master' of github.com:ClickHouse/ClickHouse into sentry

This commit is contained in:
Ivan Blinkov 2020-06-01 14:14:44 +03:00
commit 89212fb174
21 changed files with 212 additions and 128 deletions

View File

@ -24,8 +24,6 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/metric_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \
ln -s /usr/share/clickhouse-test/config/strings_dictionary.xml /etc/clickhouse-server/; \

View File

@ -59,9 +59,7 @@ ln -s /usr/share/clickhouse-test/config/zookeeper.xml /etc/clickhouse-server/con
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/metric_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/query_masking_rules.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \
ln -s /usr/share/clickhouse-test/config/strings_dictionary.xml /etc/clickhouse-server/; \

View File

@ -62,9 +62,7 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/metric_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/query_masking_rules.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/access_management.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \

View File

@ -50,9 +50,7 @@ ln -s /usr/share/clickhouse-test/config/zookeeper.xml /etc/clickhouse-server/con
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/metric_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/query_masking_rules.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/access_management.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \

View File

@ -31,7 +31,6 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
dpkg -i package_folder/clickhouse-server_*.deb; \
dpkg -i package_folder/clickhouse-client_*.deb; \
dpkg -i package_folder/clickhouse-test_*.deb; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/lib/llvm-9/bin/llvm-symbolizer /usr/bin/llvm-symbolizer; \
echo "TSAN_OPTIONS='halt_on_error=1 history_size=7 ignore_noninstrumented_modules=1 verbosity=1'" >> /etc/environment; \

View File

@ -333,6 +333,17 @@ void ColumnDecimal<T>::getExtremes(Field & min, Field & max) const
max = NearestFieldType<T>(cur_max, scale);
}
TypeIndex columnDecimalDataType(const IColumn * column)
{
if (checkColumn<ColumnDecimal<Decimal32>>(column))
return TypeIndex::Decimal32;
else if (checkColumn<ColumnDecimal<Decimal64>>(column))
return TypeIndex::Decimal64;
else if (checkColumn<ColumnDecimal<Decimal128>>(column))
return TypeIndex::Decimal128;
return TypeIndex::Nothing;
}
template class ColumnDecimal<Decimal32>;
template class ColumnDecimal<Decimal64>;
template class ColumnDecimal<Decimal128>;

View File

@ -197,4 +197,6 @@ ColumnPtr ColumnDecimal<T>::indexImpl(const PaddedPODArray<Type> & indexes, size
return res;
}
TypeIndex columnDecimalDataType(const IColumn * column);
}

View File

@ -517,6 +517,33 @@ void ColumnVector<T>::getExtremes(Field & min, Field & max) const
max = NearestFieldType<T>(cur_max);
}
TypeIndex columnVectorDataType(const IColumn * column)
{
if (checkColumn<ColumnVector<UInt8>>(column))
return TypeIndex::UInt8;
else if (checkColumn<ColumnVector<UInt16>>(column))
return TypeIndex::UInt16;
else if (checkColumn<ColumnVector<UInt32>>(column))
return TypeIndex::UInt32;
else if (checkColumn<ColumnVector<UInt64>>(column))
return TypeIndex::UInt64;
else if (checkColumn<ColumnVector<Int8>>(column))
return TypeIndex::Int8;
else if (checkColumn<ColumnVector<Int16>>(column))
return TypeIndex::Int16;
else if (checkColumn<ColumnVector<Int32>>(column))
return TypeIndex::Int32;
else if (checkColumn<ColumnVector<Int64>>(column))
return TypeIndex::Int64;
else if (checkColumn<ColumnVector<Int128>>(column))
return TypeIndex::Int128;
else if (checkColumn<ColumnVector<Float32>>(column))
return TypeIndex::Float32;
else if (checkColumn<ColumnVector<Float64>>(column))
return TypeIndex::Float64;
return TypeIndex::Nothing;
}
/// Explicit template instantiations - to avoid code bloat in headers.
template class ColumnVector<UInt8>;
template class ColumnVector<UInt16>;

View File

@ -320,4 +320,6 @@ ColumnPtr ColumnVector<T>::indexImpl(const PaddedPODArray<Type> & indexes, size_
return res;
}
TypeIndex columnVectorDataType(const IColumn * column);
}

View File

@ -23,10 +23,20 @@ namespace
{
StoragePtr createStorageDictionary(const String & database_name, const ExternalLoader::LoadResult & load_result)
{
if (!load_result.config)
return nullptr;
DictionaryStructure dictionary_structure = ExternalDictionariesLoader::getDictionaryStructure(*load_result.config);
return StorageDictionary::create(StorageID(database_name, load_result.name), load_result.name, dictionary_structure);
try
{
if (!load_result.config)
return nullptr;
DictionaryStructure dictionary_structure = ExternalDictionariesLoader::getDictionaryStructure(*load_result.config);
return StorageDictionary::create(StorageID(database_name, load_result.name), load_result.name, dictionary_structure);
}
catch (Exception & e)
{
throw Exception(
fmt::format("Error while loading dictionary '{}.{}': {}",
database_name, load_result.name, e.displayText()),
e.code());
}
}
}

View File

@ -192,7 +192,7 @@ public:
ASTTableJoin::Kind getKind() const { return kind; }
ASTTableJoin::Strictness getStrictness() const { return strictness; }
AsofRowRefs::Type getAsofType() const { return *asof_type; }
TypeIndex getAsofType() const { return *asof_type; }
ASOF::Inequality getAsofInequality() const { return asof_inequality; }
bool anyTakeLastRow() const { return any_take_last_row; }
@ -344,7 +344,7 @@ private:
bool nullable_right_side; /// In case of LEFT and FULL joins, if use_nulls, convert right-side columns to Nullable.
bool nullable_left_side; /// In case of RIGHT and FULL joins, if use_nulls, convert left-side columns to Nullable.
bool any_take_last_row; /// Overwrite existing values when encountering the same key again
std::optional<AsofRowRefs::Type> asof_type;
std::optional<TypeIndex> asof_type;
ASOF::Inequality asof_inequality;
/// Right table data. StorageJoin shares it between many Join objects.

View File

@ -17,19 +17,25 @@ namespace
/// maps enum values to types
template <typename F>
void callWithType(AsofRowRefs::Type which, F && f)
void callWithType(TypeIndex which, F && f)
{
switch (which)
{
case AsofRowRefs::Type::keyu32: return f(UInt32());
case AsofRowRefs::Type::keyu64: return f(UInt64());
case AsofRowRefs::Type::keyi32: return f(Int32());
case AsofRowRefs::Type::keyi64: return f(Int64());
case AsofRowRefs::Type::keyf32: return f(Float32());
case AsofRowRefs::Type::keyf64: return f(Float64());
case AsofRowRefs::Type::keyDecimal32: return f(Decimal32());
case AsofRowRefs::Type::keyDecimal64: return f(Decimal64());
case AsofRowRefs::Type::keyDecimal128: return f(Decimal128());
case TypeIndex::UInt8: return f(UInt8());
case TypeIndex::UInt16: return f(UInt16());
case TypeIndex::UInt32: return f(UInt32());
case TypeIndex::UInt64: return f(UInt64());
case TypeIndex::Int8: return f(Int8());
case TypeIndex::Int16: return f(Int16());
case TypeIndex::Int32: return f(Int32());
case TypeIndex::Int64: return f(Int64());
case TypeIndex::Float32: return f(Float32());
case TypeIndex::Float64: return f(Float64());
case TypeIndex::Decimal32: return f(Decimal32());
case TypeIndex::Decimal64: return f(Decimal64());
case TypeIndex::Decimal128: return f(Decimal128());
default:
break;
}
__builtin_unreachable();
@ -38,7 +44,7 @@ void callWithType(AsofRowRefs::Type which, F && f)
}
AsofRowRefs::AsofRowRefs(Type type)
AsofRowRefs::AsofRowRefs(TypeIndex type)
{
auto call = [&](const auto & t)
{
@ -50,7 +56,7 @@ AsofRowRefs::AsofRowRefs(Type type)
callWithType(type, call);
}
void AsofRowRefs::insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num)
void AsofRowRefs::insert(TypeIndex type, const IColumn * asof_column, const Block * block, size_t row_num)
{
auto call = [&](const auto & t)
{
@ -68,7 +74,7 @@ void AsofRowRefs::insert(Type type, const IColumn * asof_column, const Block * b
callWithType(type, call);
}
const RowRef * AsofRowRefs::findAsof(Type type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const
const RowRef * AsofRowRefs::findAsof(TypeIndex type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const
{
const RowRef * out = nullptr;
@ -96,52 +102,56 @@ const RowRef * AsofRowRefs::findAsof(Type type, ASOF::Inequality inequality, con
return out;
}
std::optional<AsofRowRefs::Type> AsofRowRefs::getTypeSize(const IColumn * asof_column, size_t & size)
std::optional<TypeIndex> AsofRowRefs::getTypeSize(const IColumn * asof_column, size_t & size)
{
if (typeid_cast<const ColumnVector<UInt32> *>(asof_column))
TypeIndex idx = columnVectorDataType(asof_column);
if (idx == TypeIndex::Nothing)
idx = columnDecimalDataType(asof_column);
switch (idx)
{
size = sizeof(UInt32);
return Type::keyu32;
}
else if (typeid_cast<const ColumnVector<UInt64> *>(asof_column))
{
size = sizeof(UInt64);
return Type::keyu64;
}
else if (typeid_cast<const ColumnVector<Int32> *>(asof_column))
{
size = sizeof(Int32);
return Type::keyi32;
}
else if (typeid_cast<const ColumnVector<Int64> *>(asof_column))
{
size = sizeof(Int64);
return Type::keyi64;
}
else if (typeid_cast<const ColumnVector<Float32> *>(asof_column))
{
size = sizeof(Float32);
return Type::keyf32;
}
else if (typeid_cast<const ColumnVector<Float64> *>(asof_column))
{
size = sizeof(Float64);
return Type::keyf64;
}
else if (typeid_cast<const ColumnDecimal<Decimal32> *>(asof_column))
{
size = sizeof(Decimal32);
return Type::keyDecimal32;
}
else if (typeid_cast<const ColumnDecimal<Decimal64> *>(asof_column))
{
size = sizeof(Decimal64);
return Type::keyDecimal64;
}
else if (typeid_cast<const ColumnDecimal<Decimal128> *>(asof_column))
{
size = sizeof(Decimal128);
return Type::keyDecimal128;
case TypeIndex::UInt8:
size = sizeof(UInt8);
return idx;
case TypeIndex::UInt16:
size = sizeof(UInt16);
return idx;
case TypeIndex::UInt32:
size = sizeof(UInt32);
return idx;
case TypeIndex::UInt64:
size = sizeof(UInt64);
return idx;
case TypeIndex::Int8:
size = sizeof(Int8);
return idx;
case TypeIndex::Int16:
size = sizeof(Int16);
return idx;
case TypeIndex::Int32:
size = sizeof(Int32);
return idx;
case TypeIndex::Int64:
size = sizeof(Int64);
return idx;
//case TypeIndex::Int128:
case TypeIndex::Float32:
size = sizeof(Float32);
return idx;
case TypeIndex::Float64:
size = sizeof(Float64);
return idx;
case TypeIndex::Decimal32:
size = sizeof(Decimal32);
return idx;
case TypeIndex::Decimal64:
size = sizeof(Decimal64);
return idx;
case TypeIndex::Decimal128:
size = sizeof(Decimal128);
return idx;
default:
break;
}
size = 0;

View File

@ -216,8 +216,12 @@ public:
};
using Lookups = std::variant<
Entry<UInt8>::LookupPtr,
Entry<UInt16>::LookupPtr,
Entry<UInt32>::LookupPtr,
Entry<UInt64>::LookupPtr,
Entry<Int8>::LookupPtr,
Entry<Int16>::LookupPtr,
Entry<Int32>::LookupPtr,
Entry<Int64>::LookupPtr,
Entry<Float32>::LookupPtr,
@ -226,29 +230,16 @@ public:
Entry<Decimal64>::LookupPtr,
Entry<Decimal128>::LookupPtr>;
enum class Type
{
keyu32,
keyu64,
keyi32,
keyi64,
keyf32,
keyf64,
keyDecimal32,
keyDecimal64,
keyDecimal128,
};
AsofRowRefs() {}
AsofRowRefs(Type t);
AsofRowRefs(TypeIndex t);
static std::optional<Type> getTypeSize(const IColumn * asof_column, size_t & type_size);
static std::optional<TypeIndex> getTypeSize(const IColumn * asof_column, size_t & type_size);
// This will be synchronized by the rwlock mutex in Join.h
void insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num);
void insert(TypeIndex type, const IColumn * asof_column, const Block * block, size_t row_num);
// This will internally synchronize
const RowRef * findAsof(Type type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const;
const RowRef * findAsof(TypeIndex type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const;
private:
// Lookups can be stored in a HashTable because it is memmovable

View File

@ -19,9 +19,8 @@ KafkaBlockInputStream::KafkaBlockInputStream(
, column_names(columns)
, max_block_size(max_block_size_)
, commit_in_suffix(commit_in_suffix_)
, non_virtual_header(storage.getSampleBlockNonMaterialized()) /// FIXME: add materialized columns support
, virtual_header(storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}))
, non_virtual_header(storage.getSampleBlockNonMaterialized())
, virtual_header(storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp","_timestamp_ms","_headers.name","_headers.value"}))
{
context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV)
context.setSetting("input_format_allow_errors_ratio", 0.);
@ -141,8 +140,22 @@ Block KafkaBlockInputStream::readImpl()
auto offset = buffer->currentOffset();
auto partition = buffer->currentPartition();
auto timestamp_raw = buffer->currentTimestamp();
auto timestamp = timestamp_raw ? std::chrono::duration_cast<std::chrono::seconds>(timestamp_raw->get_timestamp()).count()
: 0;
auto header_list = buffer->currentHeaderList();
Array headers_names;
Array headers_values;
if (!header_list.empty())
{
headers_names.reserve(header_list.size());
headers_values.reserve(header_list.size());
for (const auto & header : header_list)
{
headers_names.emplace_back(header.get_name());
headers_values.emplace_back(static_cast<std::string>(header.get_value()));
}
}
for (size_t i = 0; i < new_rows; ++i)
{
virtual_columns[0]->insert(topic);
@ -151,12 +164,17 @@ Block KafkaBlockInputStream::readImpl()
virtual_columns[3]->insert(partition);
if (timestamp_raw)
{
virtual_columns[4]->insert(timestamp);
auto ts = timestamp_raw->get_timestamp();
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(ts).count());
virtual_columns[5]->insert(DecimalField<Decimal64>(std::chrono::duration_cast<std::chrono::milliseconds>(ts).count(),3));
}
else
{
virtual_columns[4]->insertDefault();
virtual_columns[5]->insertDefault();
}
virtual_columns[6]->insert(headers_names);
virtual_columns[7]->insert(headers_values);
}
total_rows = total_rows + new_rows;

View File

@ -49,6 +49,7 @@ public:
auto currentOffset() const { return current[-1].get_offset(); }
auto currentPartition() const { return current[-1].get_partition(); }
auto currentTimestamp() const { return current[-1].get_timestamp(); }
const auto & currentHeaderList() const { return current[-1].get_header_list(); }
private:
using Messages = std::vector<cppkafka::Message>;

View File

@ -6,9 +6,11 @@
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
@ -747,7 +749,10 @@ NamesAndTypesList StorageKafka::getVirtuals() const
{"_key", std::make_shared<DataTypeString>()},
{"_offset", std::make_shared<DataTypeUInt64>()},
{"_partition", std::make_shared<DataTypeUInt64>()},
{"_timestamp", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())}
{"_timestamp", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())},
{"_timestamp_ms", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime64>(3))},
{"_headers.name", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"_headers.value", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}
};
}

View File

@ -1,7 +0,0 @@
<yandex>
<profiles>
<default>
<log_queries>1</log_queries>
</default>
</profiles>
</yandex>

View File

@ -1,8 +0,0 @@
<yandex>
<metric_log>
<database>system</database>
<table>metric_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<collect_interval_milliseconds>1000</collect_interval_milliseconds>
</metric_log>
</yandex>

View File

@ -876,28 +876,28 @@ def test_kafka_virtual_columns2(kafka_cluster):
kafka_format = 'JSONEachRow';
CREATE MATERIALIZED VIEW test.view Engine=Log AS
SELECT value, _key, _topic, _partition, _offset, toUnixTimestamp(_timestamp) FROM test.kafka;
SELECT value, _key, _topic, _partition, _offset, toUnixTimestamp(_timestamp), toUnixTimestamp64Milli(_timestamp_ms), _headers.name, _headers.value FROM test.kafka;
''')
producer = KafkaProducer(bootstrap_servers="localhost:9092")
producer.send(topic='virt2_0', value=json.dumps({'value': 1}), partition=0, key='k1', timestamp_ms=1577836801000)
producer.send(topic='virt2_0', value=json.dumps({'value': 2}), partition=0, key='k2', timestamp_ms=1577836802000)
producer.send(topic='virt2_0', value=json.dumps({'value': 1}), partition=0, key='k1', timestamp_ms=1577836801001, headers=[('content-encoding', b'base64')])
producer.send(topic='virt2_0', value=json.dumps({'value': 2}), partition=0, key='k2', timestamp_ms=1577836802002, headers=[('empty_value', ''),('', 'empty name'), ('',''), ('repetition', '1'), ('repetition', '2')])
producer.flush()
time.sleep(1)
producer.send(topic='virt2_0', value=json.dumps({'value': 3}), partition=1, key='k3', timestamp_ms=1577836803000)
producer.send(topic='virt2_0', value=json.dumps({'value': 4}), partition=1, key='k4', timestamp_ms=1577836804000)
producer.send(topic='virt2_0', value=json.dumps({'value': 3}), partition=1, key='k3', timestamp_ms=1577836803003, headers=[('b', 'b'),('a', 'a')])
producer.send(topic='virt2_0', value=json.dumps({'value': 4}), partition=1, key='k4', timestamp_ms=1577836804004, headers=[('a', 'a'),('b', 'b')])
producer.flush()
time.sleep(1)
producer.send(topic='virt2_1', value=json.dumps({'value': 5}), partition=0, key='k5', timestamp_ms=1577836805000)
producer.send(topic='virt2_1', value=json.dumps({'value': 6}), partition=0, key='k6', timestamp_ms=1577836806000)
producer.send(topic='virt2_1', value=json.dumps({'value': 5}), partition=0, key='k5', timestamp_ms=1577836805005)
producer.send(topic='virt2_1', value=json.dumps({'value': 6}), partition=0, key='k6', timestamp_ms=1577836806006)
producer.flush()
time.sleep(1)
producer.send(topic='virt2_1', value=json.dumps({'value': 7}), partition=1, key='k7', timestamp_ms=1577836807000)
producer.send(topic='virt2_1', value=json.dumps({'value': 8}), partition=1, key='k8', timestamp_ms=1577836808000)
producer.send(topic='virt2_1', value=json.dumps({'value': 7}), partition=1, key='k7', timestamp_ms=1577836807007)
producer.send(topic='virt2_1', value=json.dumps({'value': 8}), partition=1, key='k8', timestamp_ms=1577836808008)
producer.flush()
time.sleep(10)
@ -910,14 +910,14 @@ def test_kafka_virtual_columns2(kafka_cluster):
result = instance.query("SELECT * FROM test.view ORDER BY value", ignore_error=True)
expected = '''\
1 k1 virt2_0 0 0 1577836801
2 k2 virt2_0 0 1 1577836802
3 k3 virt2_0 1 0 1577836803
4 k4 virt2_0 1 1 1577836804
5 k5 virt2_1 0 0 1577836805
6 k6 virt2_1 0 1 1577836806
7 k7 virt2_1 1 0 1577836807
8 k8 virt2_1 1 1 1577836808
1 k1 virt2_0 0 0 1577836801 1577836801001 ['content-encoding'] ['base64']
2 k2 virt2_0 0 1 1577836802 1577836802002 ['empty_value','','','repetition','repetition'] ['','empty name','','1','2']
3 k3 virt2_0 1 0 1577836803 1577836803003 ['b','a'] ['b','a']
4 k4 virt2_0 1 1 1577836804 1577836804004 ['a','b'] ['a','b']
5 k5 virt2_1 0 0 1577836805 1577836805005 [] []
6 k6 virt2_1 0 1 1577836806 1577836806006 [] []
7 k7 virt2_1 1 0 1577836807 1577836807007 [] []
8 k8 virt2_1 1 1 1577836808 1577836808008 [] []
'''
assert TSV(result) == TSV(expected)

View File

@ -0,0 +1,13 @@
0 1
0 1
0 1
0 1
0 1
0 1
0 1
0 1
0 1
0 1
0 1
0 0000-00-00
0 0000-00-00 00:00:00

View File

@ -0,0 +1,18 @@
select * from (select 0 as k, toInt8(1) as v) t1 asof join (select 0 as k, toInt8(0) as v) t2 using(k, v);
select * from (select 0 as k, toInt16(1) as v) t1 asof join (select 0 as k, toInt16(0) as v) t2 using(k, v);
select * from (select 0 as k, toInt32(1) as v) t1 asof join (select 0 as k, toInt32(0) as v) t2 using(k, v);
select * from (select 0 as k, toInt64(1) as v) t1 asof join (select 0 as k, toInt64(0) as v) t2 using(k, v);
select * from (select 0 as k, toUInt8(1) as v) t1 asof join (select 0 as k, toUInt8(0) as v) t2 using(k, v);
select * from (select 0 as k, toUInt16(1) as v) t1 asof join (select 0 as k, toUInt16(0) as v) t2 using(k, v);
select * from (select 0 as k, toUInt32(1) as v) t1 asof join (select 0 as k, toUInt32(0) as v) t2 using(k, v);
select * from (select 0 as k, toUInt64(1) as v) t1 asof join (select 0 as k, toUInt64(0) as v) t2 using(k, v);
select * from (select 0 as k, toDecimal32(1, 0) as v) t1 asof join (select 0 as k, toDecimal32(0, 0) as v) t2 using(k, v);
select * from (select 0 as k, toDecimal64(1, 0) as v) t1 asof join (select 0 as k, toDecimal64(0, 0) as v) t2 using(k, v);
select * from (select 0 as k, toDecimal128(1, 0) as v) t1 asof join (select 0 as k, toDecimal128(0, 0) as v) t2 using(k, v);
select * from (select 0 as k, toDate(0) as v) t1 asof join (select 0 as k, toDate(0) as v) t2 using(k, v);
select * from (select 0 as k, toDateTime(0) as v) t1 asof join (select 0 as k, toDateTime(0) as v) t2 using(k, v);
select * from (select 0 as k, 'x' as v) t1 asof join (select 0 as k, 'x' as v) t2 using(k, v); -- { serverError 169 }