Merge branch 'master' into consistent_metadata3

This commit is contained in:
alesapin 2020-06-02 10:37:12 +03:00
commit 1ab3047201
27 changed files with 286 additions and 182 deletions

View File

@ -24,8 +24,6 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/metric_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \
ln -s /usr/share/clickhouse-test/config/strings_dictionary.xml /etc/clickhouse-server/; \

View File

@ -59,9 +59,7 @@ ln -s /usr/share/clickhouse-test/config/zookeeper.xml /etc/clickhouse-server/con
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/metric_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/query_masking_rules.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \
ln -s /usr/share/clickhouse-test/config/strings_dictionary.xml /etc/clickhouse-server/; \

View File

@ -62,9 +62,7 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/metric_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/query_masking_rules.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/access_management.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \

View File

@ -50,9 +50,7 @@ ln -s /usr/share/clickhouse-test/config/zookeeper.xml /etc/clickhouse-server/con
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/metric_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/query_masking_rules.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/access_management.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \

View File

@ -31,7 +31,6 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
dpkg -i package_folder/clickhouse-server_*.deb; \
dpkg -i package_folder/clickhouse-client_*.deb; \
dpkg -i package_folder/clickhouse-test_*.deb; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/lib/llvm-9/bin/llvm-symbolizer /usr/bin/llvm-symbolizer; \
echo "TSAN_OPTIONS='halt_on_error=1 history_size=7 ignore_noninstrumented_modules=1 verbosity=1'" >> /etc/environment; \

View File

@ -688,10 +688,77 @@ auto s = std::string{"Hello"};
## Неиспользуемые возможности языка C++ {#neispolzuemye-vozmozhnosti-iazyka-c}
**1.** Виртуальное наследование не используется.
**2.** Спецификаторы исключений из C++03 не используются.
## Сообщения об ошибках {#error-messages}
Сообщения об ошибках -- это часть пользовательского интерфейса программы, предназначенная для того, чтобы позволить пользователю:
* замечать ошибочные ситуации,
* понимать их смысл и причины,
* устранять эти ситуации.
Форма и содержание сообщений об ошибках должны способствовать достижению этих целей.
Есть два основных вида ошибок:
* пользовательская или системная ошибка,
* внутренняя программная ошибка.
### Пользовательская ошибка {#error-messages-user-error}
Такая ошибка вызвана действиями пользователя (неверный синтаксис запроса) или конфигурацией внешних систем (кончилось место на диске). Предполагается, что пользователь может устранить её самостоятельно. Для этого в сообщении об ошибке должна содержаться следующая информация:
* что произошло. Это должно объясняться в пользовательских терминах (`Function pow() is not supported for data type UInt128`), а не загадочными конструкциями из кода (`runtime overload resolution failed in DB::BinaryOperationBuilder<FunctionAdaptor<pow>::Impl, UInt128, Int8>::kaboongleFastPath()`).
* почему/где/когда -- любой контекст, который помогает отладить проблему. Представьте, как бы её отлаживали вы (программировать и пользоваться отладчиком нельзя).
* что можно предпринять для устранения ошибки. Здесь можно перечислить типичные причины проблемы, настройки, влияющие на это поведение, и так далее.
Пример нормального сообщения:
```
No alias for subquery or table function in JOIN (set joined_subquery_requires_alias=0 to disable restriction).
While processing '(SELECT 2 AS a)'.
```
Сказано что не хватает алиаса, показано, для какой части запроса, и предложена настройка, позволяющая ослабить это требование.
Пример катастрофически плохого сообщения:
```
The dictionary is configured incorrectly.
```
Из него не понятно:
- какой словарь?
- в чём ошибка конфигурации?
Что может сделать пользователь в такой ситуации: применять внешние отладочные инструменты, спрашивать совета на форумах, гадать на кофейной гуще, и, конечно же, ненавидеть софт, который над ним так издевается. Не нужно издеваться над пользователями, это плохой UX.
### Внутренняя программная ошибка {#error-messages-internal-error}
Такая ошибка вызвана нарушением внутренних инвариантов программы: например, внутренняя функция вызвана с неверными параметрами, не совпадают размеры колонок в блоке, произошло разыменование нулевого указателя, и так далее. Сигналы типа `SIGSEGV` относятся к этой же категории.
Появление такой ошибки всегда свидетельствует о наличии бага в программе. Пользователь не может исправить такую ошибку самостоятельно, и должен сообщить о ней разработчикам.
Есть два основных варианта проверки на такие ошибки:
* Исключение с кодом `LOGICAL_ERROR`. Его можно использовать для важных проверок, которые делаются в том числе в релизной сборке.
* `assert`. Такие условия не проверяются в релизной сборке, можно использовать для тяжёлых и опциональных проверок.
Пример сообщения, у которого должен быть код `LOGICAL_ERROR`:
`Block header is inconsistent with Chunk in ICompicatedProcessor::munge(). It is a bug!`
По каким признакам можно заметить, что здесь говорится о внутренней программной ошибке?
* в сообщении упоминаются внутренние сущности из кода,
* в сообщении написано it's a bug,
* непосредственные действия пользователя не могут исправить эту ошибку. Мы ожидаем, что пользователь зарепортит её как баг, и будем исправлять в коде.
### Как выбрать код ошибки? {#error-messages-choose}
Код ошибки предназначен для автоматической обработки некоторых видов ошибок, подобно кодам HTTP. SQL стандартизирует некоторые коды, но на деле ClickHouse не всегда соответствует этим стандартам. Лучше всего выбрать существующий код из `ErrorCodes.cpp`, который больше всего подходит по смыслу. Можно использовать общие коды типа `BAD_ARGUMENTS` или `TYPE_MISMATCH`. Заводить новый код нужно, только если вы чётко понимаете, что вам нужна специальная автоматическая обработка конкретно этой ошибки на клиенте. Для внутренних программных ошибок используется код `LOGICAL_ERROR`.
### Как добавить новое сообщение об ошибке? {#error-messages-add}
Когда добавляете сообщение об ошибке:
1. Опишите, что произошло, в пользовательских терминах, а не кусками кода.
2. Добавьте максимум контекста (с чем произошло, когда, почему, и т.д.).
3. Добавьте типичные причины.
4. Добавьте варианты исправления (настройки, ссылки на документацию).
5. Вообразите дальнейшие действия пользователя. Ваше сообщение должно помочь ему решить проблему без использования отладочных инструментов и без чужой помощи.
6. Если сообщение об ошибке не формулируется в пользовательских терминах, и действия пользователя не могут исправить проблему -- это внутренняя программная ошибка, используйте код LOGICAL_ERROR или assert.
## Платформа {#platforma}
**1.** Мы пишем код под конкретные платформы.

View File

@ -191,7 +191,7 @@
<!-- Path to folder where users and roles created by SQL commands are stored. -->
<access_control_path>/var/lib/clickhouse/access/</access_control_path>
<!-- Path to configuration file with users, access rights, profiles of settings, quotas. -->
<users_config>users.xml</users_config>
@ -405,9 +405,6 @@
</prometheus>
-->
<!-- Lazy system.*_log table creation -->
<!-- <system_tables_lazy_load>false</system_tables_lazy_load> -->
<!-- Query log. Used only for queries with setting log_queries = 1. -->
<query_log>
<!-- What table to insert data. If table is not exist, it will be created.

View File

@ -333,6 +333,17 @@ void ColumnDecimal<T>::getExtremes(Field & min, Field & max) const
max = NearestFieldType<T>(cur_max, scale);
}
TypeIndex columnDecimalDataType(const IColumn * column)
{
if (checkColumn<ColumnDecimal<Decimal32>>(column))
return TypeIndex::Decimal32;
else if (checkColumn<ColumnDecimal<Decimal64>>(column))
return TypeIndex::Decimal64;
else if (checkColumn<ColumnDecimal<Decimal128>>(column))
return TypeIndex::Decimal128;
return TypeIndex::Nothing;
}
template class ColumnDecimal<Decimal32>;
template class ColumnDecimal<Decimal64>;
template class ColumnDecimal<Decimal128>;

View File

@ -197,4 +197,6 @@ ColumnPtr ColumnDecimal<T>::indexImpl(const PaddedPODArray<Type> & indexes, size
return res;
}
TypeIndex columnDecimalDataType(const IColumn * column);
}

View File

@ -517,6 +517,33 @@ void ColumnVector<T>::getExtremes(Field & min, Field & max) const
max = NearestFieldType<T>(cur_max);
}
TypeIndex columnVectorDataType(const IColumn * column)
{
if (checkColumn<ColumnVector<UInt8>>(column))
return TypeIndex::UInt8;
else if (checkColumn<ColumnVector<UInt16>>(column))
return TypeIndex::UInt16;
else if (checkColumn<ColumnVector<UInt32>>(column))
return TypeIndex::UInt32;
else if (checkColumn<ColumnVector<UInt64>>(column))
return TypeIndex::UInt64;
else if (checkColumn<ColumnVector<Int8>>(column))
return TypeIndex::Int8;
else if (checkColumn<ColumnVector<Int16>>(column))
return TypeIndex::Int16;
else if (checkColumn<ColumnVector<Int32>>(column))
return TypeIndex::Int32;
else if (checkColumn<ColumnVector<Int64>>(column))
return TypeIndex::Int64;
else if (checkColumn<ColumnVector<Int128>>(column))
return TypeIndex::Int128;
else if (checkColumn<ColumnVector<Float32>>(column))
return TypeIndex::Float32;
else if (checkColumn<ColumnVector<Float64>>(column))
return TypeIndex::Float64;
return TypeIndex::Nothing;
}
/// Explicit template instantiations - to avoid code bloat in headers.
template class ColumnVector<UInt8>;
template class ColumnVector<UInt16>;

View File

@ -320,4 +320,6 @@ ColumnPtr ColumnVector<T>::indexImpl(const PaddedPODArray<Type> & indexes, size_
return res;
}
TypeIndex columnVectorDataType(const IColumn * column);
}

View File

@ -23,10 +23,20 @@ namespace
{
StoragePtr createStorageDictionary(const String & database_name, const ExternalLoader::LoadResult & load_result)
{
if (!load_result.config)
return nullptr;
DictionaryStructure dictionary_structure = ExternalDictionariesLoader::getDictionaryStructure(*load_result.config);
return StorageDictionary::create(StorageID(database_name, load_result.name), load_result.name, dictionary_structure);
try
{
if (!load_result.config)
return nullptr;
DictionaryStructure dictionary_structure = ExternalDictionariesLoader::getDictionaryStructure(*load_result.config);
return StorageDictionary::create(StorageID(database_name, load_result.name), load_result.name, dictionary_structure);
}
catch (Exception & e)
{
throw Exception(
fmt::format("Error while loading dictionary '{}.{}': {}",
database_name, load_result.name, e.displayText()),
e.code());
}
}
}

View File

@ -192,7 +192,7 @@ public:
ASTTableJoin::Kind getKind() const { return kind; }
ASTTableJoin::Strictness getStrictness() const { return strictness; }
AsofRowRefs::Type getAsofType() const { return *asof_type; }
TypeIndex getAsofType() const { return *asof_type; }
ASOF::Inequality getAsofInequality() const { return asof_inequality; }
bool anyTakeLastRow() const { return any_take_last_row; }
@ -344,7 +344,7 @@ private:
bool nullable_right_side; /// In case of LEFT and FULL joins, if use_nulls, convert right-side columns to Nullable.
bool nullable_left_side; /// In case of RIGHT and FULL joins, if use_nulls, convert left-side columns to Nullable.
bool any_take_last_row; /// Overwrite existing values when encountering the same key again
std::optional<AsofRowRefs::Type> asof_type;
std::optional<TypeIndex> asof_type;
ASOF::Inequality asof_inequality;
/// Right table data. StorageJoin shares it between many Join objects.

View File

@ -17,19 +17,25 @@ namespace
/// maps enum values to types
template <typename F>
void callWithType(AsofRowRefs::Type which, F && f)
void callWithType(TypeIndex which, F && f)
{
switch (which)
{
case AsofRowRefs::Type::keyu32: return f(UInt32());
case AsofRowRefs::Type::keyu64: return f(UInt64());
case AsofRowRefs::Type::keyi32: return f(Int32());
case AsofRowRefs::Type::keyi64: return f(Int64());
case AsofRowRefs::Type::keyf32: return f(Float32());
case AsofRowRefs::Type::keyf64: return f(Float64());
case AsofRowRefs::Type::keyDecimal32: return f(Decimal32());
case AsofRowRefs::Type::keyDecimal64: return f(Decimal64());
case AsofRowRefs::Type::keyDecimal128: return f(Decimal128());
case TypeIndex::UInt8: return f(UInt8());
case TypeIndex::UInt16: return f(UInt16());
case TypeIndex::UInt32: return f(UInt32());
case TypeIndex::UInt64: return f(UInt64());
case TypeIndex::Int8: return f(Int8());
case TypeIndex::Int16: return f(Int16());
case TypeIndex::Int32: return f(Int32());
case TypeIndex::Int64: return f(Int64());
case TypeIndex::Float32: return f(Float32());
case TypeIndex::Float64: return f(Float64());
case TypeIndex::Decimal32: return f(Decimal32());
case TypeIndex::Decimal64: return f(Decimal64());
case TypeIndex::Decimal128: return f(Decimal128());
default:
break;
}
__builtin_unreachable();
@ -38,7 +44,7 @@ void callWithType(AsofRowRefs::Type which, F && f)
}
AsofRowRefs::AsofRowRefs(Type type)
AsofRowRefs::AsofRowRefs(TypeIndex type)
{
auto call = [&](const auto & t)
{
@ -50,7 +56,7 @@ AsofRowRefs::AsofRowRefs(Type type)
callWithType(type, call);
}
void AsofRowRefs::insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num)
void AsofRowRefs::insert(TypeIndex type, const IColumn * asof_column, const Block * block, size_t row_num)
{
auto call = [&](const auto & t)
{
@ -68,7 +74,7 @@ void AsofRowRefs::insert(Type type, const IColumn * asof_column, const Block * b
callWithType(type, call);
}
const RowRef * AsofRowRefs::findAsof(Type type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const
const RowRef * AsofRowRefs::findAsof(TypeIndex type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const
{
const RowRef * out = nullptr;
@ -96,52 +102,56 @@ const RowRef * AsofRowRefs::findAsof(Type type, ASOF::Inequality inequality, con
return out;
}
std::optional<AsofRowRefs::Type> AsofRowRefs::getTypeSize(const IColumn * asof_column, size_t & size)
std::optional<TypeIndex> AsofRowRefs::getTypeSize(const IColumn * asof_column, size_t & size)
{
if (typeid_cast<const ColumnVector<UInt32> *>(asof_column))
TypeIndex idx = columnVectorDataType(asof_column);
if (idx == TypeIndex::Nothing)
idx = columnDecimalDataType(asof_column);
switch (idx)
{
size = sizeof(UInt32);
return Type::keyu32;
}
else if (typeid_cast<const ColumnVector<UInt64> *>(asof_column))
{
size = sizeof(UInt64);
return Type::keyu64;
}
else if (typeid_cast<const ColumnVector<Int32> *>(asof_column))
{
size = sizeof(Int32);
return Type::keyi32;
}
else if (typeid_cast<const ColumnVector<Int64> *>(asof_column))
{
size = sizeof(Int64);
return Type::keyi64;
}
else if (typeid_cast<const ColumnVector<Float32> *>(asof_column))
{
size = sizeof(Float32);
return Type::keyf32;
}
else if (typeid_cast<const ColumnVector<Float64> *>(asof_column))
{
size = sizeof(Float64);
return Type::keyf64;
}
else if (typeid_cast<const ColumnDecimal<Decimal32> *>(asof_column))
{
size = sizeof(Decimal32);
return Type::keyDecimal32;
}
else if (typeid_cast<const ColumnDecimal<Decimal64> *>(asof_column))
{
size = sizeof(Decimal64);
return Type::keyDecimal64;
}
else if (typeid_cast<const ColumnDecimal<Decimal128> *>(asof_column))
{
size = sizeof(Decimal128);
return Type::keyDecimal128;
case TypeIndex::UInt8:
size = sizeof(UInt8);
return idx;
case TypeIndex::UInt16:
size = sizeof(UInt16);
return idx;
case TypeIndex::UInt32:
size = sizeof(UInt32);
return idx;
case TypeIndex::UInt64:
size = sizeof(UInt64);
return idx;
case TypeIndex::Int8:
size = sizeof(Int8);
return idx;
case TypeIndex::Int16:
size = sizeof(Int16);
return idx;
case TypeIndex::Int32:
size = sizeof(Int32);
return idx;
case TypeIndex::Int64:
size = sizeof(Int64);
return idx;
//case TypeIndex::Int128:
case TypeIndex::Float32:
size = sizeof(Float32);
return idx;
case TypeIndex::Float64:
size = sizeof(Float64);
return idx;
case TypeIndex::Decimal32:
size = sizeof(Decimal32);
return idx;
case TypeIndex::Decimal64:
size = sizeof(Decimal64);
return idx;
case TypeIndex::Decimal128:
size = sizeof(Decimal128);
return idx;
default:
break;
}
size = 0;

View File

@ -216,8 +216,12 @@ public:
};
using Lookups = std::variant<
Entry<UInt8>::LookupPtr,
Entry<UInt16>::LookupPtr,
Entry<UInt32>::LookupPtr,
Entry<UInt64>::LookupPtr,
Entry<Int8>::LookupPtr,
Entry<Int16>::LookupPtr,
Entry<Int32>::LookupPtr,
Entry<Int64>::LookupPtr,
Entry<Float32>::LookupPtr,
@ -226,29 +230,16 @@ public:
Entry<Decimal64>::LookupPtr,
Entry<Decimal128>::LookupPtr>;
enum class Type
{
keyu32,
keyu64,
keyi32,
keyi64,
keyf32,
keyf64,
keyDecimal32,
keyDecimal64,
keyDecimal128,
};
AsofRowRefs() {}
AsofRowRefs(Type t);
AsofRowRefs(TypeIndex t);
static std::optional<Type> getTypeSize(const IColumn * asof_column, size_t & type_size);
static std::optional<TypeIndex> getTypeSize(const IColumn * asof_column, size_t & type_size);
// This will be synchronized by the rwlock mutex in Join.h
void insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num);
void insert(TypeIndex type, const IColumn * asof_column, const Block * block, size_t row_num);
// This will internally synchronize
const RowRef * findAsof(Type type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const;
const RowRef * findAsof(TypeIndex type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const;
private:
// Lookups can be stored in a HashTable because it is memmovable

View File

@ -89,16 +89,10 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
if (metric_log)
logs.emplace_back(metric_log.get());
bool lazy_load = config.getBool("system_tables_lazy_load", false);
try
{
for (auto & log : logs)
{
if (!lazy_load)
log->prepareTable();
log->startup();
}
}
catch (...)
{

View File

@ -19,9 +19,8 @@ KafkaBlockInputStream::KafkaBlockInputStream(
, column_names(columns)
, max_block_size(max_block_size_)
, commit_in_suffix(commit_in_suffix_)
, non_virtual_header(storage.getSampleBlockNonMaterialized()) /// FIXME: add materialized columns support
, virtual_header(storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}))
, non_virtual_header(storage.getSampleBlockNonMaterialized())
, virtual_header(storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp","_timestamp_ms","_headers.name","_headers.value"}))
{
context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV)
context.setSetting("input_format_allow_errors_ratio", 0.);
@ -141,8 +140,22 @@ Block KafkaBlockInputStream::readImpl()
auto offset = buffer->currentOffset();
auto partition = buffer->currentPartition();
auto timestamp_raw = buffer->currentTimestamp();
auto timestamp = timestamp_raw ? std::chrono::duration_cast<std::chrono::seconds>(timestamp_raw->get_timestamp()).count()
: 0;
auto header_list = buffer->currentHeaderList();
Array headers_names;
Array headers_values;
if (!header_list.empty())
{
headers_names.reserve(header_list.size());
headers_values.reserve(header_list.size());
for (const auto & header : header_list)
{
headers_names.emplace_back(header.get_name());
headers_values.emplace_back(static_cast<std::string>(header.get_value()));
}
}
for (size_t i = 0; i < new_rows; ++i)
{
virtual_columns[0]->insert(topic);
@ -151,12 +164,17 @@ Block KafkaBlockInputStream::readImpl()
virtual_columns[3]->insert(partition);
if (timestamp_raw)
{
virtual_columns[4]->insert(timestamp);
auto ts = timestamp_raw->get_timestamp();
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(ts).count());
virtual_columns[5]->insert(DecimalField<Decimal64>(std::chrono::duration_cast<std::chrono::milliseconds>(ts).count(),3));
}
else
{
virtual_columns[4]->insertDefault();
virtual_columns[5]->insertDefault();
}
virtual_columns[6]->insert(headers_names);
virtual_columns[7]->insert(headers_values);
}
total_rows = total_rows + new_rows;

View File

@ -49,6 +49,7 @@ public:
auto currentOffset() const { return current[-1].get_offset(); }
auto currentPartition() const { return current[-1].get_partition(); }
auto currentTimestamp() const { return current[-1].get_timestamp(); }
const auto & currentHeaderList() const { return current[-1].get_header_list(); }
private:
using Messages = std::vector<cppkafka::Message>;

View File

@ -6,9 +6,11 @@
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
@ -747,7 +749,10 @@ NamesAndTypesList StorageKafka::getVirtuals() const
{"_key", std::make_shared<DataTypeString>()},
{"_offset", std::make_shared<DataTypeUInt64>()},
{"_partition", std::make_shared<DataTypeUInt64>()},
{"_timestamp", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())}
{"_timestamp", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())},
{"_timestamp_ms", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime64>(3))},
{"_headers.name", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"_headers.value", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}
};
}

View File

@ -1,7 +0,0 @@
<yandex>
<profiles>
<default>
<log_queries>1</log_queries>
</default>
</profiles>
</yandex>

View File

@ -1,8 +0,0 @@
<yandex>
<metric_log>
<database>system</database>
<table>metric_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<collect_interval_milliseconds>1000</collect_interval_milliseconds>
</metric_log>
</yandex>

View File

@ -24,18 +24,16 @@ def test_config_without_part_log(start_cluster):
node1.query("SYSTEM FLUSH LOGS")
assert "Table system.part_log doesn't exist" in node1.query_and_get_error("SELECT * FROM system.part_log")
# Note: if part_log is defined, we cannot say when the table will be created - because of metric_log, trace_log, text_log, query_log...
def test_config_with_standard_part_log(start_cluster):
assert node2.query("SELECT * FROM system.part_log") == ''
node2.query("CREATE TABLE test_table(word String, value UInt64) ENGINE=MergeTree() Order by value")
assert node2.query("SELECT * FROM system.part_log") == ''
node2.query("INSERT INTO test_table VALUES ('name', 1)")
node2.query("SYSTEM FLUSH LOGS")
assert int(node2.query("SELECT count() FROM system.part_log")) == 1
assert node2.query("SELECT * FROM system.part_log") != ""
def test_config_with_non_standard_part_log(start_cluster):
assert node3.query("SELECT * FROM system.own_part_log") == ''
node3.query("CREATE TABLE test_table(word String, value UInt64) ENGINE=MergeTree() Order by value")
assert node3.query("SELECT * FROM system.own_part_log") == ''
node3.query("INSERT INTO test_table VALUES ('name', 1)")
node3.query("SYSTEM FLUSH LOGS")
assert int(node3.query("SELECT count() FROM system.own_part_log")) == 1
assert node3.query("SELECT * FROM system.own_part_log") != ""

View File

@ -876,28 +876,28 @@ def test_kafka_virtual_columns2(kafka_cluster):
kafka_format = 'JSONEachRow';
CREATE MATERIALIZED VIEW test.view Engine=Log AS
SELECT value, _key, _topic, _partition, _offset, toUnixTimestamp(_timestamp) FROM test.kafka;
SELECT value, _key, _topic, _partition, _offset, toUnixTimestamp(_timestamp), toUnixTimestamp64Milli(_timestamp_ms), _headers.name, _headers.value FROM test.kafka;
''')
producer = KafkaProducer(bootstrap_servers="localhost:9092")
producer.send(topic='virt2_0', value=json.dumps({'value': 1}), partition=0, key='k1', timestamp_ms=1577836801000)
producer.send(topic='virt2_0', value=json.dumps({'value': 2}), partition=0, key='k2', timestamp_ms=1577836802000)
producer.send(topic='virt2_0', value=json.dumps({'value': 1}), partition=0, key='k1', timestamp_ms=1577836801001, headers=[('content-encoding', b'base64')])
producer.send(topic='virt2_0', value=json.dumps({'value': 2}), partition=0, key='k2', timestamp_ms=1577836802002, headers=[('empty_value', ''),('', 'empty name'), ('',''), ('repetition', '1'), ('repetition', '2')])
producer.flush()
time.sleep(1)
producer.send(topic='virt2_0', value=json.dumps({'value': 3}), partition=1, key='k3', timestamp_ms=1577836803000)
producer.send(topic='virt2_0', value=json.dumps({'value': 4}), partition=1, key='k4', timestamp_ms=1577836804000)
producer.send(topic='virt2_0', value=json.dumps({'value': 3}), partition=1, key='k3', timestamp_ms=1577836803003, headers=[('b', 'b'),('a', 'a')])
producer.send(topic='virt2_0', value=json.dumps({'value': 4}), partition=1, key='k4', timestamp_ms=1577836804004, headers=[('a', 'a'),('b', 'b')])
producer.flush()
time.sleep(1)
producer.send(topic='virt2_1', value=json.dumps({'value': 5}), partition=0, key='k5', timestamp_ms=1577836805000)
producer.send(topic='virt2_1', value=json.dumps({'value': 6}), partition=0, key='k6', timestamp_ms=1577836806000)
producer.send(topic='virt2_1', value=json.dumps({'value': 5}), partition=0, key='k5', timestamp_ms=1577836805005)
producer.send(topic='virt2_1', value=json.dumps({'value': 6}), partition=0, key='k6', timestamp_ms=1577836806006)
producer.flush()
time.sleep(1)
producer.send(topic='virt2_1', value=json.dumps({'value': 7}), partition=1, key='k7', timestamp_ms=1577836807000)
producer.send(topic='virt2_1', value=json.dumps({'value': 8}), partition=1, key='k8', timestamp_ms=1577836808000)
producer.send(topic='virt2_1', value=json.dumps({'value': 7}), partition=1, key='k7', timestamp_ms=1577836807007)
producer.send(topic='virt2_1', value=json.dumps({'value': 8}), partition=1, key='k8', timestamp_ms=1577836808008)
producer.flush()
time.sleep(10)
@ -910,14 +910,14 @@ def test_kafka_virtual_columns2(kafka_cluster):
result = instance.query("SELECT * FROM test.view ORDER BY value", ignore_error=True)
expected = '''\
1 k1 virt2_0 0 0 1577836801
2 k2 virt2_0 0 1 1577836802
3 k3 virt2_0 1 0 1577836803
4 k4 virt2_0 1 1 1577836804
5 k5 virt2_1 0 0 1577836805
6 k6 virt2_1 0 1 1577836806
7 k7 virt2_1 1 0 1577836807
8 k8 virt2_1 1 1 1577836808
1 k1 virt2_0 0 0 1577836801 1577836801001 ['content-encoding'] ['base64']
2 k2 virt2_0 0 1 1577836802 1577836802002 ['empty_value','','','repetition','repetition'] ['','empty name','','1','2']
3 k3 virt2_0 1 0 1577836803 1577836803003 ['b','a'] ['b','a']
4 k4 virt2_0 1 1 1577836804 1577836804004 ['a','b'] ['a','b']
5 k5 virt2_1 0 0 1577836805 1577836805005 [] []
6 k6 virt2_1 0 1 1577836806 1577836806006 [] []
7 k7 virt2_1 1 0 1577836807 1577836807007 [] []
8 k8 virt2_1 1 1 1577836808 1577836808008 [] []
'''
assert TSV(result) == TSV(expected)

View File

@ -1,4 +0,0 @@
<?xml version="1.0"?>
<yandex>
<system_tables_lazy_load>true</system_tables_lazy_load>
</yandex>

View File

@ -1,32 +0,0 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node_default = cluster.add_instance('node_default')
# main_configs is mandatory ,since system_tables_lazy_load will be read earlier then parsing of config_lazy.xml
node_lazy = cluster.add_instance('node_lazy', config_dir='configs', main_configs=['configs/config_lazy.xml'])
system_logs = [
# disabled by default
# ('system.part_log'),
# ('system.text_log'),
# enabled by default
('system.query_log'),
('system.query_thread_log'),
('system.trace_log'),
('system.metric_log'),
]
@pytest.fixture(scope='module')
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
@pytest.mark.parametrize('table', system_logs)
def test_system_table(start_cluster, table):
node_default.query('SELECT * FROM {}'.format(table))
assert "Table {} doesn't exist".format(table) in node_lazy.query_and_get_error('SELECT * FROM {}'.format(table))

View File

@ -0,0 +1,13 @@
0 1
0 1
0 1
0 1
0 1
0 1
0 1
0 1
0 1
0 1
0 1
0 0000-00-00
0 0000-00-00 00:00:00

View File

@ -0,0 +1,18 @@
select * from (select 0 as k, toInt8(1) as v) t1 asof join (select 0 as k, toInt8(0) as v) t2 using(k, v);
select * from (select 0 as k, toInt16(1) as v) t1 asof join (select 0 as k, toInt16(0) as v) t2 using(k, v);
select * from (select 0 as k, toInt32(1) as v) t1 asof join (select 0 as k, toInt32(0) as v) t2 using(k, v);
select * from (select 0 as k, toInt64(1) as v) t1 asof join (select 0 as k, toInt64(0) as v) t2 using(k, v);
select * from (select 0 as k, toUInt8(1) as v) t1 asof join (select 0 as k, toUInt8(0) as v) t2 using(k, v);
select * from (select 0 as k, toUInt16(1) as v) t1 asof join (select 0 as k, toUInt16(0) as v) t2 using(k, v);
select * from (select 0 as k, toUInt32(1) as v) t1 asof join (select 0 as k, toUInt32(0) as v) t2 using(k, v);
select * from (select 0 as k, toUInt64(1) as v) t1 asof join (select 0 as k, toUInt64(0) as v) t2 using(k, v);
select * from (select 0 as k, toDecimal32(1, 0) as v) t1 asof join (select 0 as k, toDecimal32(0, 0) as v) t2 using(k, v);
select * from (select 0 as k, toDecimal64(1, 0) as v) t1 asof join (select 0 as k, toDecimal64(0, 0) as v) t2 using(k, v);
select * from (select 0 as k, toDecimal128(1, 0) as v) t1 asof join (select 0 as k, toDecimal128(0, 0) as v) t2 using(k, v);
select * from (select 0 as k, toDate(0) as v) t1 asof join (select 0 as k, toDate(0) as v) t2 using(k, v);
select * from (select 0 as k, toDateTime(0) as v) t1 asof join (select 0 as k, toDateTime(0) as v) t2 using(k, v);
select * from (select 0 as k, 'x' as v) t1 asof join (select 0 as k, 'x' as v) t2 using(k, v); -- { serverError 169 }