Merge pull request #5382 from abyss7/issue-5286

Add virtual columns to Kafka Engine tables
This commit is contained in:
alexey-milovidov 2019-07-01 21:30:09 +03:00 committed by GitHub
commit c43dfce041
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 639 additions and 364 deletions

View File

@ -336,6 +336,7 @@ MutableColumns Block::mutateColumns()
void Block::setColumns(MutableColumns && columns)
{
/// TODO: assert if |columns| doesn't match |data|!
size_t num_columns = data.size();
for (size_t i = 0; i < num_columns; ++i)
data[i].column = std::move(columns[i]);
@ -344,6 +345,7 @@ void Block::setColumns(MutableColumns && columns)
void Block::setColumns(const Columns & columns)
{
/// TODO: assert if |columns| doesn't match |data|!
size_t num_columns = data.size();
for (size_t i = 0; i < num_columns; ++i)
data[i].column = columns[i];

View File

@ -1,8 +1,8 @@
#pragma once
#include <cstdint>
#include <string>
#include <vector>
#include <cstdint>
namespace DB

View File

@ -60,7 +60,7 @@ ConvertingBlockInputStream::ConvertingBlockInputStream(
if (input_header.has(res_elem.name))
conversion[result_col_num] = input_header.getPositionByName(res_elem.name);
else
throw Exception("Cannot find column " + backQuoteIfNeed(res_elem.name) + " in source stream",
throw Exception("Cannot find column " + backQuote(res_elem.name) + " in source stream",
ErrorCodes::THERE_IS_NO_COLUMN);
break;
}

View File

@ -12,7 +12,7 @@ namespace DB
class OneBlockInputStream : public IBlockInputStream
{
public:
OneBlockInputStream(const Block & block_) : block(block_) {}
explicit OneBlockInputStream(const Block & block_) : block(block_) {}
String getName() const override { return "One"; }

View File

@ -63,6 +63,17 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
}
Block PushingToViewsBlockOutputStream::getHeader() const
{
/// If we don't write directly to the destination
/// then expect that we're inserting with precalculated virtual columns
if (output)
return storage->getSampleBlock();
else
return storage->getSampleBlockWithVirtuals();
}
void PushingToViewsBlockOutputStream::write(const Block & block)
{
/** Throw an exception if the sizes of arrays - elements of nested data structures doesn't match.
@ -73,6 +84,8 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
Nested::validateArraySizes(block);
if (output)
/// TODO: to support virtual and alias columns inside MVs, we should return here the inserted block extended
/// with additional columns directly from storage and pass it to MVs instead of raw block.
output->write(block);
/// Don't process materialized views if this block is duplicate

View File

@ -22,7 +22,7 @@ public:
const String & database, const String & table, const StoragePtr & storage_,
const Context & context_, const ASTPtr & query_ptr_, bool no_destination = false);
Block getHeader() const override { return storage->getSampleBlock(); }
Block getHeader() const override;
void write(const Block & block) override;
void flush() override;

View File

@ -65,11 +65,12 @@ void registerInputFormatRowBinary(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<BinaryRowInputStream>(buf, sample, false, false),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
factory.registerInputFormat("RowBinaryWithNamesAndTypes", [](
@ -78,11 +79,12 @@ void registerInputFormatRowBinary(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<BinaryRowInputStream>(buf, sample, true, true),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}

View File

@ -28,9 +28,15 @@ BlockInputStreamFromRowInputStream::BlockInputStreamFromRowInputStream(
const Block & sample_,
UInt64 max_block_size_,
UInt64 rows_portion_size_,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
: row_input(row_input_), sample(sample_), max_block_size(max_block_size_), rows_portion_size(rows_portion_size_),
allow_errors_num(settings.input_allow_errors_num), allow_errors_ratio(settings.input_allow_errors_ratio)
: row_input(row_input_)
, sample(sample_)
, max_block_size(max_block_size_)
, rows_portion_size(rows_portion_size_)
, read_virtual_columns_callback(callback)
, allow_errors_num(settings.input_allow_errors_num)
, allow_errors_ratio(settings.input_allow_errors_ratio)
{
}
@ -73,6 +79,8 @@ Block BlockInputStreamFromRowInputStream::readImpl()
RowReadExtension info;
if (!row_input->read(columns, info))
break;
if (read_virtual_columns_callback)
read_virtual_columns_callback();
for (size_t column_idx = 0; column_idx < info.read_columns.size(); ++column_idx)
{

View File

@ -2,6 +2,7 @@
#include <Core/Defines.h>
#include <DataStreams/IBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <Formats/IRowInputStream.h>
@ -24,6 +25,7 @@ public:
const Block & sample_,
UInt64 max_block_size_,
UInt64 rows_portion_size_,
FormatFactory::ReadCallback callback,
const FormatSettings & settings);
void readPrefix() override { row_input->readPrefix(); }
@ -45,6 +47,10 @@ private:
Block sample;
UInt64 max_block_size;
UInt64 rows_portion_size;
/// Callback used to setup virtual columns after reading each row.
FormatFactory::ReadCallback read_virtual_columns_callback;
BlockMissingValues block_missing_values;
UInt64 allow_errors_num;

View File

@ -531,11 +531,12 @@ void registerInputFormatCSV(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<CSVRowInputStream>(buf, sample, with_names, settings),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}
}

View File

@ -308,6 +308,7 @@ void registerInputFormatCapnProto(FormatFactory & factory)
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
@ -315,6 +316,7 @@ void registerInputFormatCapnProto(FormatFactory & factory)
sample,
max_block_size,
rows_portion_size,
callback,
settings);
});
}

View File

@ -27,7 +27,14 @@ const FormatFactory::Creators & FormatFactory::getCreators(const String & name)
}
BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & buf, const Block & sample, const Context & context, UInt64 max_block_size, UInt64 rows_portion_size) const
BlockInputStreamPtr FormatFactory::getInput(
const String & name,
ReadBuffer & buf,
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
ReadCallback callback) const
{
const auto & input_getter = getCreators(name).first;
if (!input_getter)
@ -48,7 +55,8 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
return input_getter(buf, sample, context, max_block_size, rows_portion_size, format_settings);
return input_getter(
buf, sample, context, max_block_size, rows_portion_size, callback ? callback : ReadCallback(), format_settings);
}

View File

@ -24,6 +24,11 @@ class WriteBuffer;
*/
class FormatFactory final : public ext::singleton<FormatFactory>
{
public:
/// This callback allows to perform some additional actions after reading a single row.
/// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer.
using ReadCallback = std::function<void()>;
private:
using InputCreator = std::function<BlockInputStreamPtr(
ReadBuffer & buf,
@ -31,6 +36,7 @@ private:
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
ReadCallback callback,
const FormatSettings & settings)>;
using OutputCreator = std::function<BlockOutputStreamPtr(
@ -44,8 +50,14 @@ private:
using FormatsDictionary = std::unordered_map<String, Creators>;
public:
BlockInputStreamPtr getInput(const String & name, ReadBuffer & buf,
const Block & sample, const Context & context, UInt64 max_block_size, UInt64 rows_portion_size = 0) const;
BlockInputStreamPtr getInput(
const String & name,
ReadBuffer & buf,
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size = 0,
ReadCallback callback = {}) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context) const;

View File

@ -260,11 +260,12 @@ void registerInputFormatJSONEachRow(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<JSONEachRowRowInputStream>(buf, sample, settings),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}

View File

@ -14,6 +14,7 @@ void registerInputFormatNative(FormatFactory & factory)
const Context &,
UInt64 /* max_block_size */,
UInt64 /* min_read_rows */,
FormatFactory::ReadCallback /* callback */,
const FormatSettings &)
{
return std::make_shared<NativeBlockInputStream>(buf, sample, 0);

View File

@ -476,6 +476,7 @@ void registerInputFormatParquet(FormatFactory & factory)
const Context & context,
UInt64 /* max_block_size */,
UInt64 /* rows_portion_size */,
FormatFactory::ReadCallback /* callback */,
const FormatSettings & /* settings */) { return std::make_shared<ParquetBlockInputStream>(buf, sample, context); });
}

View File

@ -74,11 +74,12 @@ void registerInputFormatProtobuf(FormatFactory & factory)
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<ProtobufRowInputStream>(buf, sample, FormatSchemaInfo(context, "Protobuf")),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}

View File

@ -199,11 +199,12 @@ void registerInputFormatTSKV(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TSKVRowInputStream>(buf, sample, settings),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}

View File

@ -457,11 +457,12 @@ void registerInputFormatTabSeparated(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TabSeparatedRowInputStream>(buf, sample, false, false, settings),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}
@ -473,11 +474,12 @@ void registerInputFormatTabSeparated(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TabSeparatedRowInputStream>(buf, sample, true, false, settings),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}
@ -489,11 +491,12 @@ void registerInputFormatTabSeparated(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TabSeparatedRowInputStream>(buf, sample, true, true, settings),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}
}

View File

@ -156,11 +156,12 @@ void registerInputFormatValues(FormatFactory & factory)
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<ValuesRowInputStream>(buf, sample, context, settings),
sample, max_block_size, rows_portion_size, settings);
sample, max_block_size, rows_portion_size, callback, settings);
});
}

View File

@ -45,7 +45,7 @@ try
FormatSettings format_settings;
RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, []{}, format_settings);
RowOutputStreamPtr row_output = std::make_shared<TabSeparatedRowOutputStream>(out_buf, sample, false, false, format_settings);
BlockOutputStreamFromRowOutputStream block_output(row_output, sample);

View File

@ -42,7 +42,7 @@ try
RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings);
RowOutputStreamPtr row_output = std::make_shared<TabSeparatedRowOutputStream>(out_buf, sample, false, false, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, []{}, format_settings);
BlockOutputStreamFromRowOutputStream block_output(row_output, sample);
copyData(block_input, block_output);

View File

@ -57,8 +57,6 @@ StoragePtr InterpreterInsertQuery::getTable(const ASTInsertQuery & query)
Block InterpreterInsertQuery::getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table)
{
Block table_sample_non_materialized = table->getSampleBlockNonMaterialized();
/// If the query does not include information about columns
if (!query.columns)
@ -66,6 +64,8 @@ Block InterpreterInsertQuery::getSampleBlock(const ASTInsertQuery & query, const
/// Format Native ignores header and write blocks as is.
if (query.format == "Native")
return {};
else if (query.no_destination)
return table->getSampleBlockWithVirtuals();
else
return table_sample_non_materialized;
}
@ -108,14 +108,14 @@ BlockIO InterpreterInsertQuery::execute()
if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()))
{
out = std::make_shared<SquashingBlockOutputStream>(
out, table->getSampleBlock(), context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
out, out->getHeader(), context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
}
auto query_sample_block = getSampleBlock(query, table);
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, table->getSampleBlock(), table->getColumns().getDefaults(), context);
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
out_wrapper->setProcessListElement(context.getProcessListElement());

View File

@ -76,7 +76,9 @@ void collectSourceColumns(const ASTSelectQuery * select_query, StoragePtr storag
if (select_query)
{
const auto & storage_aliases = storage->getColumns().getAliases();
const auto & storage_virtuals = storage->getColumns().getVirtuals();
source_columns.insert(source_columns.end(), storage_aliases.begin(), storage_aliases.end());
source_columns.insert(source_columns.end(), storage_virtuals.begin(), storage_virtuals.end());
}
}
}

View File

@ -182,7 +182,7 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
{
if (type == ADD_COLUMN)
{
ColumnDescription column(column_name, data_type);
ColumnDescription column(column_name, data_type, false);
if (default_expression)
{
column.default_desc.kind = default_kind;
@ -388,8 +388,8 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
column_to_command_idx[column_name] = i;
/// we're creating dummy DataTypeUInt8 in order to prevent the NullPointerException in ExpressionActions
columns.add(ColumnDescription(
column_name, command.data_type ? command.data_type : std::make_shared<DataTypeUInt8>()));
columns.add(
ColumnDescription(column_name, command.data_type ? command.data_type : std::make_shared<DataTypeUInt8>(), false));
if (command.default_expression)
{

View File

@ -32,6 +32,11 @@ namespace ErrorCodes
extern const int CANNOT_PARSE_TEXT;
}
ColumnDescription::ColumnDescription(String name_, DataTypePtr type_, bool is_virtual_)
: name(std::move(name_)), type(std::move(type_)), is_virtual(is_virtual_)
{
}
bool ColumnDescription::operator==(const ColumnDescription & other) const
{
auto codec_str = [](const CompressionCodecPtr & codec_ptr) { return codec_ptr ? codec_ptr->getCodecDesc() : String(); };
@ -115,10 +120,10 @@ void ColumnDescription::readText(ReadBuffer & buf)
}
ColumnsDescription::ColumnsDescription(NamesAndTypesList ordinary)
ColumnsDescription::ColumnsDescription(NamesAndTypesList ordinary, bool all_virtuals)
{
for (auto & elem : ordinary)
add(ColumnDescription(std::move(elem.name), std::move(elem.type)));
add(ColumnDescription(std::move(elem.name), std::move(elem.type), all_virtuals));
}
@ -227,7 +232,7 @@ NamesAndTypesList ColumnsDescription::getOrdinary() const
{
NamesAndTypesList ret;
for (const auto & col : columns)
if (col.default_desc.kind == ColumnDefaultKind::Default)
if (col.default_desc.kind == ColumnDefaultKind::Default && !col.is_virtual)
ret.emplace_back(col.name, col.type);
return ret;
}
@ -250,6 +255,15 @@ NamesAndTypesList ColumnsDescription::getAliases() const
return ret;
}
NamesAndTypesList ColumnsDescription::getVirtuals() const
{
NamesAndTypesList result;
for (const auto & column : columns)
if (column.is_virtual)
result.emplace_back(column.name, column.type);
return result;
}
NamesAndTypesList ColumnsDescription::getAll() const
{
NamesAndTypesList ret;
@ -285,7 +299,7 @@ NamesAndTypesList ColumnsDescription::getAllPhysical() const
{
NamesAndTypesList ret;
for (const auto & col : columns)
if (col.default_desc.kind != ColumnDefaultKind::Alias)
if (col.default_desc.kind != ColumnDefaultKind::Alias && !col.is_virtual)
ret.emplace_back(col.name, col.type);
return ret;
}
@ -294,7 +308,7 @@ Names ColumnsDescription::getNamesOfPhysical() const
{
Names ret;
for (const auto & col : columns)
if (col.default_desc.kind != ColumnDefaultKind::Alias)
if (col.default_desc.kind != ColumnDefaultKind::Alias && !col.is_virtual)
ret.emplace_back(col.name);
return ret;
}
@ -302,7 +316,7 @@ Names ColumnsDescription::getNamesOfPhysical() const
NameAndTypePair ColumnsDescription::getPhysical(const String & column_name) const
{
auto it = columns.get<1>().find(column_name);
if (it == columns.get<1>().end() || it->default_desc.kind == ColumnDefaultKind::Alias)
if (it == columns.get<1>().end() || it->default_desc.kind == ColumnDefaultKind::Alias || it->is_virtual)
throw Exception("There is no physical column " + column_name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
return NameAndTypePair(it->name, it->type);
}
@ -310,7 +324,7 @@ NameAndTypePair ColumnsDescription::getPhysical(const String & column_name) cons
bool ColumnsDescription::hasPhysical(const String & column_name) const
{
auto it = columns.get<1>().find(column_name);
return it != columns.get<1>().end() && it->default_desc.kind != ColumnDefaultKind::Alias;
return it != columns.get<1>().end() && it->default_desc.kind != ColumnDefaultKind::Alias && !it->is_virtual;
}

View File

@ -32,9 +32,10 @@ struct ColumnDescription
String comment;
CompressionCodecPtr codec;
ASTPtr ttl;
bool is_virtual = false;
ColumnDescription() = default;
ColumnDescription(String name_, DataTypePtr type_) : name(std::move(name_)), type(std::move(type_)) {}
ColumnDescription(String name_, DataTypePtr type_, bool is_virtual_);
bool operator==(const ColumnDescription & other) const;
bool operator!=(const ColumnDescription & other) const { return !(*this == other); }
@ -49,7 +50,7 @@ class ColumnsDescription
{
public:
ColumnsDescription() = default;
explicit ColumnsDescription(NamesAndTypesList ordinary_);
explicit ColumnsDescription(NamesAndTypesList ordinary_, bool all_virtuals = false);
/// `after_column` can be a Nested column name;
void add(ColumnDescription column, const String & after_column = String());
@ -67,8 +68,9 @@ public:
NamesAndTypesList getOrdinary() const;
NamesAndTypesList getMaterialized() const;
NamesAndTypesList getAliases() const;
/// ordinary + materialized + aliases.
NamesAndTypesList getAll() const;
NamesAndTypesList getVirtuals() const;
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.
NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + virtuals.
using ColumnTTLs = std::unordered_map<String, ASTPtr>;
ColumnTTLs getColumnTTLs() const;
@ -87,8 +89,6 @@ public:
throw Exception("Cannot modify ColumnDescription for column " + column_name + ": column name cannot be changed", ErrorCodes::LOGICAL_ERROR);
}
/// ordinary + materialized.
NamesAndTypesList getAllPhysical() const;
Names getNamesOfPhysical() const;
bool hasPhysical(const String & column_name) const;
NameAndTypePair getPhysical(const String & column_name) const;

View File

@ -25,28 +25,21 @@ IStorage::IStorage(ColumnsDescription columns_)
setColumns(std::move(columns_));
}
IStorage::IStorage(ColumnsDescription columns_, ColumnsDescription virtuals_) : virtuals(std::move(virtuals_))
{
setColumns(std::move(columns_));
}
const ColumnsDescription & IStorage::getColumns() const
{
return columns;
}
void IStorage::setColumns(ColumnsDescription columns_)
{
if (columns_.getOrdinary().empty())
throw Exception("Empty list of columns passed", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
columns = std::move(columns_);
}
const IndicesDescription & IStorage::getIndices() const
{
return indices;
}
void IStorage::setIndices(IndicesDescription indices_)
{
indices = std::move(indices_);
}
NameAndTypePair IStorage::getColumn(const String & column_name) const
{
/// By default, we assume that there are no virtual columns in the storage.
@ -69,6 +62,16 @@ Block IStorage::getSampleBlock() const
return res;
}
Block IStorage::getSampleBlockWithVirtuals() const
{
auto res = getSampleBlock();
for (const auto & column : getColumns().getVirtuals())
res.insert({column.type->createColumn(), column.type, column.name});
return res;
}
Block IStorage::getSampleBlockNonMaterialized() const
{
Block res;
@ -266,6 +269,29 @@ void IStorage::check(const Block & block, bool need_all) const
}
}
void IStorage::setColumns(ColumnsDescription columns_)
{
if (columns_.getOrdinary().empty())
throw Exception("Empty list of columns passed", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
columns = std::move(columns_);
for (const auto & column : virtuals)
{
if (!columns.has(column.name))
columns.add(column);
}
}
void IStorage::setIndices(IndicesDescription indices_)
{
indices = std::move(indices_);
}
bool IStorage::isVirtualColumn(const String & column_name) const
{
return getColumns().get(column_name).is_virtual;
}
TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id)
{
TableStructureReadLockHolder result;

View File

@ -50,6 +50,7 @@ class IStorage : public std::enable_shared_from_this<IStorage>
public:
IStorage() = default;
explicit IStorage(ColumnsDescription columns_);
IStorage(ColumnsDescription columns_, ColumnsDescription virtuals_);
virtual ~IStorage() = default;
IStorage(const IStorage &) = delete;
@ -82,20 +83,18 @@ public:
public: /// thread-unsafe part. lockStructure must be acquired
const ColumnsDescription & getColumns() const;
void setColumns(ColumnsDescription columns_);
const ColumnsDescription & getColumns() const; /// returns combined set of columns
const IndicesDescription & getIndices() const;
void setIndices(IndicesDescription indices_);
/// NOTE: these methods should include virtual columns,
/// but should NOT include ALIAS columns (they are treated separately).
virtual NameAndTypePair getColumn(const String & column_name) const;
virtual bool hasColumn(const String & column_name) const;
Block getSampleBlock() const;
Block getSampleBlockNonMaterialized() const;
Block getSampleBlockForColumns(const Names & column_names) const; /// including virtual and alias columns.
Block getSampleBlock() const; /// ordinary + materialized.
Block getSampleBlockWithVirtuals() const; /// ordinary + materialized + virtuals.
Block getSampleBlockNonMaterialized() const; /// ordinary.
Block getSampleBlockForColumns(const Names & column_names) const; /// ordinary + materialized + aliases + virtuals.
/// Verify that all the requested names are in the table and are set correctly:
/// list of names is not empty and the names do not repeat.
@ -112,8 +111,17 @@ public: /// thread-unsafe part. lockStructure must be acquired
/// If |need_all| is set, then checks that all the columns of the table are in the block.
void check(const Block & block, bool need_all = false) const;
protected: /// still thread-unsafe part.
void setColumns(ColumnsDescription columns_); /// sets only real columns, possibly overwrites virtual ones.
void setIndices(IndicesDescription indices_);
/// Returns whether the column is virtual - by default all columns are real.
/// Initially reserved virtual column name may be shadowed by real column.
virtual bool isVirtualColumn(const String & column_name) const;
private:
ColumnsDescription columns;
ColumnsDescription columns; /// combined real and virtual columns
const ColumnsDescription virtuals = {};
IndicesDescription indices;
public:
@ -322,12 +330,6 @@ public:
/// Returns additional columns that need to be read for FINAL to work.
virtual Names getColumnsRequiredForFinal() const { return {}; }
protected:
/// Returns whether the column is virtual - by default all columns are real.
/// Initially reserved virtual column name may be shadowed by real column.
/// Returns false even for non-existent non-virtual columns.
virtual bool isVirtualColumn(const String & /* column_name */) const { return false; }
private:
/// You always need to take the next three locks in this order.

View File

@ -1,5 +1,7 @@
#include <Storages/Kafka/KafkaBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
@ -7,15 +9,17 @@ namespace DB
{
KafkaBlockInputStream::KafkaBlockInputStream(
StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_)
: storage(storage_), context(context_), max_block_size(max_block_size_)
StorageKafka & storage_, const Context & context_, const Names & columns, size_t max_block_size_)
: storage(storage_), context(context_), column_names(columns), max_block_size(max_block_size_)
{
context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV)
context.setSetting("input_format_allow_errors_ratio", 0.);
context.setSetting("input_format_allow_errors_num", storage.skip_broken);
context.setSetting("input_format_allow_errors_num", storage.skipBroken());
if (!schema.empty())
context.setSetting("format_schema", schema);
if (!storage.getSchemaName().empty())
context.setSetting("format_schema", storage.getSchemaName());
virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset"}).cloneEmptyColumns();
}
KafkaBlockInputStream::~KafkaBlockInputStream()
@ -29,6 +33,11 @@ KafkaBlockInputStream::~KafkaBlockInputStream()
storage.pushBuffer(buffer);
}
Block KafkaBlockInputStream::getHeader() const
{
return storage.getSampleBlockForColumns(column_names);
}
void KafkaBlockInputStream::readPrefixImpl()
{
buffer = storage.tryClaimBuffer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds());
@ -37,20 +46,49 @@ void KafkaBlockInputStream::readPrefixImpl()
if (!buffer)
buffer = storage.createBuffer();
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->subscribe(storage.topics);
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->subscribe(storage.getTopics());
const auto & limits = getLimits();
const size_t poll_timeout = buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->pollTimeout();
size_t rows_portion_size = poll_timeout ? std::min<size_t>(max_block_size, limits.max_execution_time.totalMilliseconds() / poll_timeout) : max_block_size;
rows_portion_size = std::max(rows_portion_size, 1ul);
auto child = FormatFactory::instance().getInput(storage.format_name, *buffer, storage.getSampleBlock(), context, max_block_size, rows_portion_size);
auto non_virtual_header = storage.getSampleBlockNonMaterialized(); /// FIXME: add materialized columns support
auto read_callback = [this]
{
const auto * sub_buffer = buffer->subBufferAs<ReadBufferFromKafkaConsumer>();
virtual_columns[0]->insert(sub_buffer->currentTopic()); // "topic"
virtual_columns[1]->insert(sub_buffer->currentKey()); // "key"
virtual_columns[2]->insert(sub_buffer->currentOffset()); // "offset"
};
auto child = FormatFactory::instance().getInput(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, rows_portion_size, read_callback);
child->setLimits(limits);
addChild(child);
broken = true;
}
Block KafkaBlockInputStream::readImpl()
{
Block block = children.back()->read();
if (!block)
return block;
Block virtual_block = storage.getSampleBlockForColumns({"_topic", "_key", "_offset"}).cloneWithColumns(std::move(virtual_columns));
virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset"}).cloneEmptyColumns();
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
block.insert(column);
/// FIXME: materialize MATERIALIZED columns here.
return ConvertingBlockInputStream(
context, std::make_shared<OneBlockInputStream>(block), getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name)
.read();
}
void KafkaBlockInputStream::readSuffixImpl()
{
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->commit();

View File

@ -11,22 +11,24 @@ namespace DB
class KafkaBlockInputStream : public IBlockInputStream
{
public:
KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_);
KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const Names & columns, size_t max_block_size_);
~KafkaBlockInputStream() override;
String getName() const override { return storage.getName(); }
Block readImpl() override { return children.back()->read(); }
Block getHeader() const override { return storage.getSampleBlock(); }
Block getHeader() const override;
void readPrefixImpl() override;
Block readImpl() override;
void readSuffixImpl() override;
private:
StorageKafka & storage;
Context context;
Names column_names;
UInt64 max_block_size;
BufferPtr buffer;
MutableColumns virtual_columns;
bool broken = true, claimed = false;
};

View File

@ -3,6 +3,7 @@
namespace DB
{
using namespace std::chrono_literals;
ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
ConsumerPtr consumer_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_, bool intermediate_commit_)
: ReadBuffer(nullptr, 0)
@ -17,7 +18,10 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
ReadBufferFromKafkaConsumer::~ReadBufferFromKafkaConsumer()
{
/// NOTE: see https://github.com/edenhill/librdkafka/issues/2077
consumer->unsubscribe();
consumer->unassign();
while (consumer->get_consumer_queue().next_event(1s));
}
void ReadBufferFromKafkaConsumer::commit()
@ -53,8 +57,6 @@ void ReadBufferFromKafkaConsumer::subscribe(const Names & topics)
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if (consumer->get_subscription().empty())
{
using namespace std::chrono_literals;
consumer->pause(); // don't accidentally read any messages
consumer->subscribe(topics);
consumer->poll(5s);
@ -73,7 +75,7 @@ void ReadBufferFromKafkaConsumer::unsubscribe()
consumer->unsubscribe();
}
/// Try to commit messages implicitly after we processed the previous batch.
/// Do commit messages implicitly after we processed the previous batch.
bool ReadBufferFromKafkaConsumer::nextImpl()
{
/// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
@ -86,18 +88,21 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
{
if (intermediate_commit)
commit();
messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout));
/// Don't drop old messages immediately, since we may need them for virtual columns.
auto new_messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout));
if (new_messages.empty())
{
LOG_TRACE(log, "Stalled");
stalled = true;
return false;
}
messages = std::move(new_messages);
current = messages.begin();
LOG_TRACE(log, "Polled batch of " << messages.size() << " messages");
}
if (messages.empty())
{
stalled = true;
return false;
}
if (auto err = current->get_error())
{
++current;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/Names.h>
#include <Core/Types.h>
#include <IO/DelimitedReadBuffer.h>
#include <common/logger_useful.h>
@ -25,6 +26,11 @@ public:
auto pollTimeout() { return poll_timeout; }
// Return values for the message that's being read.
String currentTopic() const { return current[-1].get_topic(); }
String currentKey() const { return current[-1].get_key(); }
auto currentOffset() const { return current[-1].get_offset(); }
private:
using Messages = std::vector<cppkafka::Message>;

View File

@ -4,6 +4,8 @@
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
@ -69,21 +71,36 @@ StorageKafka::StorageKafka(
const std::string & database_name_,
Context & context_,
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, char row_delimiter_, const String & schema_name_,
size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken_,
const String & brokers_,
const String & group_,
const Names & topics_,
const String & format_name_,
char row_delimiter_,
const String & schema_name_,
size_t num_consumers_,
UInt64 max_block_size_,
size_t skip_broken_,
bool intermediate_commit_)
: IStorage{columns_},
table_name(table_name_), database_name(database_name_), global_context(context_),
topics(global_context.getMacros()->expand(topics_)),
brokers(global_context.getMacros()->expand(brokers_)),
group(global_context.getMacros()->expand(group_)),
format_name(global_context.getMacros()->expand(format_name_)),
row_delimiter(row_delimiter_),
schema_name(global_context.getMacros()->expand(schema_name_)),
num_consumers(num_consumers_), max_block_size(max_block_size_), log(&Logger::get("StorageKafka (" + table_name_ + ")")),
semaphore(0, num_consumers_),
skip_broken(skip_broken_), intermediate_commit(intermediate_commit_)
: IStorage(
columns_,
ColumnsDescription({{"_topic", std::make_shared<DataTypeString>()},
{"_key", std::make_shared<DataTypeString>()},
{"_offset", std::make_shared<DataTypeUInt64>()}}, true))
, table_name(table_name_)
, database_name(database_name_)
, global_context(context_)
, topics(global_context.getMacros()->expand(topics_))
, brokers(global_context.getMacros()->expand(brokers_))
, group(global_context.getMacros()->expand(group_))
, format_name(global_context.getMacros()->expand(format_name_))
, row_delimiter(row_delimiter_)
, schema_name(global_context.getMacros()->expand(schema_name_))
, num_consumers(num_consumers_)
, max_block_size(max_block_size_)
, log(&Logger::get("StorageKafka (" + table_name_ + ")"))
, semaphore(0, num_consumers_)
, skip_broken(skip_broken_)
, intermediate_commit(intermediate_commit_)
{
task = global_context.getSchedulePool().createTask(log->name(), [this]{ streamThread(); });
task->deactivate();
@ -92,14 +109,12 @@ StorageKafka::StorageKafka(
BlockInputStreams StorageKafka::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const SelectQueryInfo & /* query_info */,
const Context & context,
QueryProcessingStage::Enum /* processed_stage */,
size_t /* max_block_size */,
unsigned /* num_streams */)
{
check(column_names);
if (num_created_consumers == 0)
return BlockInputStreams();
@ -113,7 +128,7 @@ BlockInputStreams StorageKafka::read(
/// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block
/// TODO: probably that leads to awful performance.
/// FIXME: seems that doesn't help with extra reading and committing unprocessed messages.
streams.emplace_back(std::make_shared<KafkaBlockInputStream>(*this, context, schema_name, 1));
streams.emplace_back(std::make_shared<KafkaBlockInputStream>(*this, context, column_names, 1));
}
LOG_DEBUG(log, "Starting reading " << streams.size() << " streams");
@ -161,55 +176,22 @@ void StorageKafka::shutdown()
}
void StorageKafka::rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name)
{
table_name = new_table_name;
database_name = new_database_name;
}
void StorageKafka::updateDependencies()
{
task->activateAndSchedule();
}
cppkafka::Configuration StorageKafka::createConsumerConfiguration()
{
cppkafka::Configuration conf;
LOG_TRACE(log, "Setting brokers: " << brokers);
conf.set("metadata.broker.list", brokers);
LOG_TRACE(log, "Setting Group ID: " << group << " Client ID: clickhouse");
conf.set("group.id", group);
conf.set("client.id", VERSION_FULL);
// If no offset stored for this group, read all messages from the start
conf.set("auto.offset.reset", "smallest");
// We manually commit offsets after a stream successfully finished
conf.set("enable.auto.commit", "false");
// Ignore EOF messages
conf.set("enable.partition.eof", "false");
// for debug logs inside rdkafka
// conf.set("debug", "consumer,cgrp,topic,fetch");
// Update consumer configuration from the configuration
const auto & config = global_context.getConfigRef();
if (config.has(CONFIG_PREFIX))
loadFromConfig(conf, config, CONFIG_PREFIX);
// Update consumer topic-specific configuration
for (const auto & topic : topics)
{
const auto topic_config_key = CONFIG_PREFIX + "_" + topic;
if (config.has(topic_config_key))
loadFromConfig(conf, config, topic_config_key);
}
return conf;
}
BufferPtr StorageKafka::createBuffer()
{
// Create a consumer.
// Create a consumer and subscribe to topics
auto consumer = std::make_shared<cppkafka::Consumer>(createConsumerConfiguration());
// Limit the number of batched messages to allow early cancellations
@ -253,6 +235,47 @@ void StorageKafka::pushBuffer(BufferPtr buffer)
semaphore.set();
}
cppkafka::Configuration StorageKafka::createConsumerConfiguration()
{
cppkafka::Configuration conf;
LOG_TRACE(log, "Setting brokers: " << brokers);
conf.set("metadata.broker.list", brokers);
LOG_TRACE(log, "Setting Group ID: " << group << " Client ID: clickhouse");
conf.set("group.id", group);
conf.set("client.id", VERSION_FULL);
// If no offset stored for this group, read all messages from the start
conf.set("auto.offset.reset", "smallest");
// We manually commit offsets after a stream successfully finished
conf.set("enable.auto.commit", "false");
// Ignore EOF messages
conf.set("enable.partition.eof", "false");
// for debug logs inside rdkafka
// conf.set("debug", "consumer,cgrp,topic,fetch");
// Update consumer configuration from the configuration
const auto & config = global_context.getConfigRef();
if (config.has(CONFIG_PREFIX))
loadFromConfig(conf, config, CONFIG_PREFIX);
// Update consumer topic-specific configuration
for (const auto & topic : topics)
{
const auto topic_config_key = CONFIG_PREFIX + "_" + topic;
if (config.has(topic_config_key))
loadFromConfig(conf, config, topic_config_key);
}
return conf;
}
bool StorageKafka::checkDependencies(const String & current_database_name, const String & current_table_name)
{
// Check if all dependencies are attached
@ -321,19 +344,23 @@ bool StorageKafka::streamToViews()
auto insert = std::make_shared<ASTInsertQuery>();
insert->database = database_name;
insert->table = table_name;
insert->no_destination = true; // Only insert into dependent views
insert->no_destination = true; // Only insert into dependent views and expect that input blocks contain virtual columns
const Settings & settings = global_context.getSettingsRef();
size_t block_size = max_block_size;
if (block_size == 0)
block_size = settings.max_block_size.value;
// Create a stream for each consumer and join them in a union stream
InterpreterInsertQuery interpreter{insert, global_context};
auto block_io = interpreter.execute();
// Create a stream for each consumer and join them in a union stream
BlockInputStreams streams;
streams.reserve(num_created_consumers);
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto stream = std::make_shared<KafkaBlockInputStream>(*this, global_context, schema_name, block_size);
auto stream = std::make_shared<KafkaBlockInputStream>(*this, global_context, block_io.out->getHeader().getNames(), block_size);
streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL
@ -350,9 +377,6 @@ bool StorageKafka::streamToViews()
else
in = streams[0];
// Execute the query
InterpreterInsertQuery interpreter{insert, global_context};
auto block_io = interpreter.execute();
copyData(*in, *block_io.out, &stream_cancelled);
// Check whether the limits were applied during query execution

View File

@ -20,9 +20,6 @@ namespace DB
*/
class StorageKafka : public ext::shared_ptr_helper<StorageKafka>, public IStorage
{
friend class KafkaBlockInputStream;
friend class KafkaBlockOutputStream;
public:
std::string getName() const override { return "Kafka"; }
std::string getTableName() const override { return table_name; }
@ -39,14 +36,31 @@ public:
size_t max_block_size,
unsigned num_streams) override;
void rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name) override
{
table_name = new_table_name;
database_name = new_database_name;
}
void rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name) override;
void updateDependencies() override;
BufferPtr createBuffer();
BufferPtr claimBuffer();
BufferPtr tryClaimBuffer(long wait_ms);
void pushBuffer(BufferPtr buf);
const auto & getTopics() const { return topics; }
const auto & getFormatName() const { return format_name; }
const auto & getSchemaName() const { return schema_name; }
const auto & skipBroken() const { return skip_broken; }
protected:
StorageKafka(
const std::string & table_name_,
const std::string & database_name_,
Context & context_,
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, char row_delimiter_, const String & schema_name_,
size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken,
bool intermediate_commit_);
private:
// Configuration and state
String table_name;
@ -56,18 +70,15 @@ private:
const String brokers;
const String group;
const String format_name;
// Optional row delimiter for generating char delimited stream
// in order to make various input stream parsers happy.
char row_delimiter;
char row_delimiter; /// optional row delimiter for generating char delimited stream in order to make various input stream parsers happy.
const String schema_name;
/// Total number of consumers
size_t num_consumers;
/// Maximum block size for insertion into this table
UInt64 max_block_size;
/// Number of actually created consumers.
size_t num_consumers; /// total number of consumers
UInt64 max_block_size; /// maximum block size for insertion into this table
/// Can differ from num_consumers in case of exception in startup() (or if startup() hasn't been called).
/// In this case we still need to be able to shutdown() properly.
size_t num_created_consumers = 0;
size_t num_created_consumers = 0; /// number of actually created consumers.
Poco::Logger * log;
// Consumer list
@ -84,25 +95,10 @@ private:
std::atomic<bool> stream_cancelled{false};
cppkafka::Configuration createConsumerConfiguration();
BufferPtr createBuffer();
BufferPtr claimBuffer();
BufferPtr tryClaimBuffer(long wait_ms);
void pushBuffer(BufferPtr buf);
void streamThread();
bool streamToViews();
bool checkDependencies(const String & database_name, const String & table_name);
protected:
StorageKafka(
const std::string & table_name_,
const std::string & database_name_,
Context & context_,
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, char row_delimiter_, const String & schema_name_,
size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken,
bool intermediate_commit_);
};
}

View File

@ -254,12 +254,12 @@ void StorageCatBoostPool::createSampleBlockAndColumns()
/// Order is important: first numeric columns, then categorial, then all others.
for (const auto & column : num_columns)
columns.add(DB::ColumnDescription(column.name, column.type));
columns.add(DB::ColumnDescription(column.name, column.type, false));
for (const auto & column : cat_columns)
columns.add(DB::ColumnDescription(column.name, column.type));
columns.add(DB::ColumnDescription(column.name, column.type, false));
for (const auto & column : other_columns)
{
DB::ColumnDescription column_desc(column.name, column.type);
DB::ColumnDescription column_desc(column.name, column.type, false);
/// We assign Materialized kind to the column so that it doesn't show in SELECT *.
/// Because the table is readonly, we do not need default expression.
column_desc.default_desc.kind = ColumnDefaultKind::Materialized;
@ -270,7 +270,7 @@ void StorageCatBoostPool::createSampleBlockAndColumns()
{
if (!desc.alias.empty())
{
DB::ColumnDescription column(desc.alias, get_type(desc.column_type));
DB::ColumnDescription column(desc.alias, get_type(desc.column_type), false);
column.default_desc.kind = ColumnDefaultKind::Alias;
column.default_desc.expression = std::make_shared<ASTIdentifier>(desc.column_name);
columns.add(std::move(column));

View File

@ -51,9 +51,11 @@ StorageMerge::StorageMerge(
const String & source_database_,
const String & table_name_regexp_,
const Context & context_)
: IStorage{columns_},
name(name_), source_database(source_database_),
table_name_regexp(table_name_regexp_), global_context(context_)
: IStorage(columns_, ColumnsDescription({{"_table", std::make_shared<DataTypeString>()}}, true))
, name(name_)
, source_database(source_database_)
, table_name_regexp(table_name_regexp_)
, global_context(context_)
{
}
@ -61,44 +63,29 @@ StorageMerge::StorageMerge(
/// NOTE: structure of underlying tables as well as their set are not constant,
/// so the results of these methods may become obsolete after the call.
bool StorageMerge::isVirtualColumn(const String & column_name) const
{
if (column_name != "_table")
return false;
return !IStorage::hasColumn(column_name);
}
NameAndTypePair StorageMerge::getColumn(const String & column_name) const
{
if (IStorage::hasColumn(column_name))
return IStorage::getColumn(column_name);
if (!IStorage::hasColumn(column_name))
{
auto first_table = getFirstTable([](auto &&) { return true; });
if (first_table)
return first_table->getColumn(column_name);
}
/// virtual column of the Merge table itself
if (column_name == "_table")
return { column_name, std::make_shared<DataTypeString>() };
/// virtual (and real) columns of the underlying tables
auto first_table = getFirstTable([](auto &&) { return true; });
if (first_table)
return first_table->getColumn(column_name);
throw Exception("There is no column " + column_name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
return IStorage::getColumn(column_name);
}
bool StorageMerge::hasColumn(const String & column_name) const
{
if (column_name == "_table")
return true;
if (!IStorage::hasColumn(column_name))
{
auto first_table = getFirstTable([](auto &&) { return true; });
if (first_table)
return first_table->hasColumn(column_name);
}
if (IStorage::hasColumn(column_name))
return true;
auto first_table = getFirstTable([](auto &&) { return true; });
if (first_table)
return first_table->hasColumn(column_name);
return false;
return true;
}
@ -188,7 +175,7 @@ BlockInputStreams StorageMerge::read(
for (const auto & column_name : column_names)
{
if (isVirtualColumn(column_name))
if (column_name == "_table" && isVirtualColumn(column_name))
has_table_virtual_column = true;
else
real_column_names.push_back(column_name);

View File

@ -26,6 +26,7 @@ public:
bool supportsFinal() const override { return true; }
bool supportsIndexForIn() const override { return true; }
/// Consider columns coming from the underlying tables
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
@ -86,8 +87,6 @@ protected:
void convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,
BlockInputStreamPtr & source_stream, QueryProcessingStage::Enum processed_stage);
bool isVirtualColumn(const String & column_name) const override;
};
}

View File

@ -257,7 +257,7 @@ StorageSystemPartsBase::StorageSystemPartsBase(std::string name_, NamesAndTypesL
auto add_alias = [&](const String & alias_name, const String & column_name)
{
ColumnDescription column(alias_name, columns.get(column_name).type);
ColumnDescription column(alias_name, columns.get(column_name).type, false);
column.default_desc.kind = ColumnDefaultKind::Alias;
column.default_desc.expression = std::make_shared<ASTIdentifier>(column_name);
columns.add(column);

View File

@ -23,71 +23,11 @@
namespace DB
{
namespace VirtualColumnUtils
namespace
{
String chooseSuffix(const NamesAndTypesList & columns, const String & name)
{
int id = 0;
String current_suffix;
while (true)
{
bool done = true;
for (const auto & it : columns)
if (it.name == name + current_suffix)
{
done = false;
break;
}
if (done) break;
++id;
current_suffix = toString<Int32>(id);
}
return current_suffix;
}
String chooseSuffixForSet(const NamesAndTypesList & columns, const std::vector<String> & names)
{
int id = 0;
String current_suffix;
while (true)
{
bool done = true;
for (const auto & it : columns)
{
for (size_t i = 0; i < names.size(); ++i)
{
if (it.name == names[i] + current_suffix)
{
done = false;
break;
}
}
if (!done)
break;
}
if (done)
break;
++id;
current_suffix = toString<Int32>(id);
}
return current_suffix;
}
void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value)
{
auto & select = ast->as<ASTSelectQuery &>();
if (!select.with())
select.setExpression(ASTSelectQuery::Expression::WITH, std::make_shared<ASTExpressionList>());
auto literal = std::make_shared<ASTLiteral>(value);
literal->alias = column_name;
literal->prefer_alias_to_column_name = true;
select.with()->children.push_back(literal);
}
/// Verifying that the function depends only on the specified columns
static bool isValidFunction(const ASTPtr & expression, const NameSet & columns)
bool isValidFunction(const ASTPtr & expression, const NameSet & columns)
{
for (size_t i = 0; i < expression->children.size(); ++i)
if (!isValidFunction(expression->children[i], columns))
@ -100,7 +40,7 @@ static bool isValidFunction(const ASTPtr & expression, const NameSet & columns)
}
/// Extract all subfunctions of the main conjunction, but depending only on the specified columns
static void extractFunctions(const ASTPtr & expression, const NameSet & columns, std::vector<ASTPtr> & result)
void extractFunctions(const ASTPtr & expression, const NameSet & columns, std::vector<ASTPtr> & result)
{
const auto * function = expression->as<ASTFunction>();
if (function && function->name == "and")
@ -115,7 +55,7 @@ static void extractFunctions(const ASTPtr & expression, const NameSet & columns,
}
/// Construct a conjunction from given functions
static ASTPtr buildWhereExpression(const ASTs & functions)
ASTPtr buildWhereExpression(const ASTs & functions)
{
if (functions.size() == 0)
return nullptr;
@ -130,6 +70,23 @@ static ASTPtr buildWhereExpression(const ASTs & functions)
return new_query;
}
}
namespace VirtualColumnUtils
{
void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value)
{
auto & select = ast->as<ASTSelectQuery &>();
if (!select.with())
select.setExpression(ASTSelectQuery::Expression::WITH, std::make_shared<ASTExpressionList>());
auto literal = std::make_shared<ASTLiteral>(value);
literal->alias = column_name;
literal->prefer_alias_to_column_name = true;
select.with()->children.push_back(literal);
}
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context)
{
const auto & select = query->as<ASTSelectQuery &>();

View File

@ -3,7 +3,7 @@
#include <set>
#include <Core/Block.h>
#include <Parsers/IAST.h>
#include <Parsers/IAST_fwd.h>
namespace DB
@ -16,13 +16,6 @@ class NamesAndTypesList;
namespace VirtualColumnUtils
{
/// Calculate the minimum numeric suffix to add to the string so that it is not present in the set
String chooseSuffix(const NamesAndTypesList & columns, const String & name);
/// Calculate the minimum total numeric suffix to add to each string,
/// so that none is present in the set.
String chooseSuffixForSet(const NamesAndTypesList & columns, const std::vector<String> & names);
/// Adds to the select query section `select column_name as value`
/// For example select _port as 9000.
void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value);
@ -33,14 +26,14 @@ void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & va
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context);
/// Extract from the input stream a set of `name` column values
template <typename T1>
std::multiset<T1> extractSingleValueFromBlock(const Block & block, const String & name)
template <typename T>
std::multiset<T> extractSingleValueFromBlock(const Block & block, const String & name)
{
std::multiset<T1> res;
std::multiset<T> res;
const ColumnWithTypeAndName & data = block.getByName(name);
size_t rows = block.rows();
for (size_t i = 0; i < rows; ++i)
res.insert((*data.column)[i].get<T1>());
res.insert((*data.column)[i].get<T>());
return res;
}

View File

@ -338,30 +338,32 @@ class ClickHouseCluster:
self.docker_client = docker.from_env(version=self.docker_api_version)
common_opts = ['up', '-d', '--force-recreate']
if self.with_zookeeper and self.base_zookeeper_cmd:
subprocess_check_call(self.base_zookeeper_cmd + ['up', '-d', '--force-recreate'])
subprocess_check_call(self.base_zookeeper_cmd + common_opts)
for command in self.pre_zookeeper_commands:
self.run_kazoo_commands_with_retries(command, repeats=5)
self.wait_zookeeper_to_start(120)
if self.with_mysql and self.base_mysql_cmd:
subprocess_check_call(self.base_mysql_cmd + ['up', '-d', '--force-recreate'])
subprocess_check_call(self.base_mysql_cmd + common_opts)
self.wait_mysql_to_start(120)
if self.with_postgres and self.base_postgres_cmd:
subprocess_check_call(self.base_postgres_cmd + ['up', '-d', '--force-recreate'])
subprocess_check_call(self.base_postgres_cmd + common_opts)
self.wait_postgres_to_start(120)
if self.with_kafka and self.base_kafka_cmd:
subprocess_check_call(self.base_kafka_cmd + ['up', '-d', '--force-recreate'])
subprocess_check_call(self.base_kafka_cmd + common_opts + ['--renew-anon-volumes'])
self.kafka_docker_id = self.get_instance_docker_id('kafka1')
if self.with_hdfs and self.base_hdfs_cmd:
subprocess_check_call(self.base_hdfs_cmd + ['up', '-d', '--force-recreate'])
subprocess_check_call(self.base_hdfs_cmd + common_opts)
self.wait_hdfs_to_start(120)
if self.with_mongo and self.base_mongo_cmd:
subprocess_check_call(self.base_mongo_cmd + ['up', '-d', '--force-recreate'])
subprocess_check_call(self.base_mongo_cmd + common_opts)
self.wait_mongo_to_start(30)
subprocess_check_call(self.base_cmd + ['up', '-d', '--no-recreate'])

View File

@ -22,7 +22,6 @@ import kafka_pb2
# TODO: add test for run-time offset update in CH, if we manually update it on Kafka side.
# TODO: add test for mat. view is working.
# TODO: add test for SELECT LIMIT is working.
# TODO: modify tests to respect `skip_broken_messages` setting.
@ -86,8 +85,8 @@ def kafka_produce_protobuf_messages(topic, start_index, num_messages):
# Since everything is async and shaky when receiving messages from Kafka,
# we may want to try and check results multiple times in a loop.
def kafka_check_result(result, check=False):
fpath = p.join(p.dirname(__file__), 'test_kafka_json.reference')
def kafka_check_result(result, check=False, ref_file='test_kafka_json.reference'):
fpath = p.join(p.dirname(__file__), ref_file)
with open(fpath) as reference:
if check:
assert TSV(result) == TSV(reference)
@ -148,13 +147,12 @@ def test_kafka_settings_new_syntax(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'new',
kafka_group_name = 'new',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n',
kafka_skip_broken_messages = 1;
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'new',
kafka_group_name = 'new',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n',
kafka_skip_broken_messages = 1;
''')
messages = []
@ -172,7 +170,7 @@ def test_kafka_settings_new_syntax(kafka_cluster):
kafka_produce('new', messages)
result = ''
for i in range(50):
while True:
result += instance.query('SELECT * FROM test.kafka')
if kafka_check_result(result):
break
@ -183,12 +181,11 @@ def test_kafka_csv_with_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'csv',
kafka_group_name = 'csv',
kafka_format = 'CSV',
kafka_row_delimiter = '\\n';
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'csv',
kafka_group_name = 'csv',
kafka_format = 'CSV',
kafka_row_delimiter = '\\n';
''')
messages = []
@ -197,7 +194,7 @@ def test_kafka_csv_with_delimiter(kafka_cluster):
kafka_produce('csv', messages)
result = ''
for i in range(50):
while True:
result += instance.query('SELECT * FROM test.kafka')
if kafka_check_result(result):
break
@ -208,12 +205,11 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'tsv',
kafka_group_name = 'tsv',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'tsv',
kafka_group_name = 'tsv',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
''')
messages = []
@ -222,7 +218,7 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
kafka_produce('tsv', messages)
result = ''
for i in range(50):
while True:
result += instance.query('SELECT * FROM test.kafka')
if kafka_check_result(result):
break
@ -233,11 +229,10 @@ def test_kafka_json_without_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json',
kafka_group_name = 'json',
kafka_format = 'JSONEachRow';
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json',
kafka_group_name = 'json',
kafka_format = 'JSONEachRow';
''')
messages = ''
@ -251,7 +246,7 @@ def test_kafka_json_without_delimiter(kafka_cluster):
kafka_produce('json', [messages])
result = ''
for i in range(50):
while True:
result += instance.query('SELECT * FROM test.kafka')
if kafka_check_result(result):
break
@ -262,12 +257,11 @@ def test_kafka_protobuf(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'pb',
kafka_group_name = 'pb',
kafka_format = 'Protobuf',
kafka_schema = 'kafka.proto:KeyValuePair';
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'pb',
kafka_group_name = 'pb',
kafka_format = 'Protobuf',
kafka_schema = 'kafka.proto:KeyValuePair';
''')
kafka_produce_protobuf_messages('pb', 0, 20)
@ -275,7 +269,7 @@ def test_kafka_protobuf(kafka_cluster):
kafka_produce_protobuf_messages('pb', 21, 29)
result = ''
for i in range(50):
while True:
result += instance.query('SELECT * FROM test.kafka')
if kafka_check_result(result):
break
@ -288,12 +282,11 @@ def test_kafka_materialized_view(kafka_cluster):
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json',
kafka_group_name = 'json',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'mv',
kafka_group_name = 'mv',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
@ -304,9 +297,9 @@ def test_kafka_materialized_view(kafka_cluster):
messages = []
for i in range(50):
messages.append(json.dumps({'key': i, 'value': i}))
kafka_produce('json', messages)
kafka_produce('mv', messages)
for i in range(20):
while True:
time.sleep(1)
result = instance.query('SELECT * FROM test.view')
if kafka_check_result(result):
@ -321,7 +314,7 @@ def test_kafka_materialized_view(kafka_cluster):
@pytest.mark.skip(reason="Hungs")
def test_kafka_flush_on_big_message(kafka_cluster):
# Create batchs of messages of size ~100Kb
kafka_messages = 10000
kafka_messages = 1000
batch_messages = 1000
messages = [json.dumps({'key': i, 'value': 'x' * 100}) * batch_messages for i in range(kafka_messages)]
kafka_produce('flush', messages)
@ -331,12 +324,11 @@ def test_kafka_flush_on_big_message(kafka_cluster):
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'flush',
kafka_group_name = 'flush',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 10;
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'flush',
kafka_group_name = 'flush',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 10;
CREATE TABLE test.view (key UInt64, value String)
ENGINE = MergeTree
ORDER BY key;
@ -356,7 +348,7 @@ def test_kafka_flush_on_big_message(kafka_cluster):
except kafka.errors.GroupCoordinatorNotAvailableError:
continue
for _ in range(20):
while True:
time.sleep(1)
result = instance.query('SELECT count() FROM test.view')
if int(result) == kafka_messages*batch_messages:
@ -365,6 +357,71 @@ def test_kafka_flush_on_big_message(kafka_cluster):
assert int(result) == kafka_messages*batch_messages, 'ClickHouse lost some messages: {}'.format(result)
def test_kafka_virtual_columns(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'virt1',
kafka_group_name = 'virt1',
kafka_format = 'JSONEachRow';
''')
messages = ''
for i in range(25):
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('virt1', [messages])
messages = ''
for i in range(25, 50):
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('virt1', [messages])
result = ''
while True:
time.sleep(1)
result += instance.query('SELECT _key, key, _topic, value, _offset FROM test.kafka')
if kafka_check_result(result, False, 'test_kafka_virtual1.reference'):
break
kafka_check_result(result, True, 'test_kafka_virtual1.reference')
def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'virt2',
kafka_group_name = 'virt2',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT *, _key as kafka_key, _topic as topic, _offset as offset FROM test.kafka;
''')
messages = []
for i in range(50):
messages.append(json.dumps({'key': i, 'value': i}))
kafka_produce('virt2', messages)
while True:
time.sleep(1)
result = instance.query('SELECT kafka_key, key, topic, value, offset FROM test.view')
if kafka_check_result(result, False, 'test_kafka_virtual2.reference'):
break
kafka_check_result(result, True, 'test_kafka_virtual2.reference')
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")

View File

@ -0,0 +1,50 @@
0 virt1 0 0
1 virt1 1 0
2 virt1 2 0
3 virt1 3 0
4 virt1 4 0
5 virt1 5 0
6 virt1 6 0
7 virt1 7 0
8 virt1 8 0
9 virt1 9 0
10 virt1 10 0
11 virt1 11 0
12 virt1 12 0
13 virt1 13 0
14 virt1 14 0
15 virt1 15 0
16 virt1 16 0
17 virt1 17 0
18 virt1 18 0
19 virt1 19 0
20 virt1 20 0
21 virt1 21 0
22 virt1 22 0
23 virt1 23 0
24 virt1 24 0
25 virt1 25 1
26 virt1 26 1
27 virt1 27 1
28 virt1 28 1
29 virt1 29 1
30 virt1 30 1
31 virt1 31 1
32 virt1 32 1
33 virt1 33 1
34 virt1 34 1
35 virt1 35 1
36 virt1 36 1
37 virt1 37 1
38 virt1 38 1
39 virt1 39 1
40 virt1 40 1
41 virt1 41 1
42 virt1 42 1
43 virt1 43 1
44 virt1 44 1
45 virt1 45 1
46 virt1 46 1
47 virt1 47 1
48 virt1 48 1
49 virt1 49 1

View File

@ -0,0 +1,50 @@
0 virt2 0 0
1 virt2 1 1
2 virt2 2 2
3 virt2 3 3
4 virt2 4 4
5 virt2 5 5
6 virt2 6 6
7 virt2 7 7
8 virt2 8 8
9 virt2 9 9
10 virt2 10 10
11 virt2 11 11
12 virt2 12 12
13 virt2 13 13
14 virt2 14 14
15 virt2 15 15
16 virt2 16 16
17 virt2 17 17
18 virt2 18 18
19 virt2 19 19
20 virt2 20 20
21 virt2 21 21
22 virt2 22 22
23 virt2 23 23
24 virt2 24 24
25 virt2 25 25
26 virt2 26 26
27 virt2 27 27
28 virt2 28 28
29 virt2 29 29
30 virt2 30 30
31 virt2 31 31
32 virt2 32 32
33 virt2 33 33
34 virt2 34 34
35 virt2 35 35
36 virt2 36 36
37 virt2 37 37
38 virt2 38 38
39 virt2 39 39
40 virt2 40 40
41 virt2 41 41
42 virt2 42 42
43 virt2 43 43
44 virt2 44 44
45 virt2 45 45
46 virt2 46 46
47 virt2 47 47
48 virt2 48 48
49 virt2 49 49

View File

@ -27,11 +27,11 @@ Example 2:
Let's say you have a old table (WatchLog_old) and decided to change partitioning without moving data to a new table (WatchLog_new) and you need to see data from both tables.
```
CREATE TABLE WatchLog_old(date Date, UserId Int64, EventType String, Cnt UInt64)
CREATE TABLE WatchLog_old(date Date, UserId Int64, EventType String, Cnt UInt64)
ENGINE=MergeTree(date, (UserId, EventType), 8192);
INSERT INTO WatchLog_old VALUES ('2018-01-01', 1, 'hit', 3);
CREATE TABLE WatchLog_new(date Date, UserId Int64, EventType String, Cnt UInt64)
CREATE TABLE WatchLog_new(date Date, UserId Int64, EventType String, Cnt UInt64)
ENGINE=MergeTree PARTITION BY date ORDER BY (UserId, EventType) SETTINGS index_granularity=8192;
INSERT INTO WatchLog_new VALUES ('2018-01-02', 2, 'hit', 3);
@ -61,7 +61,9 @@ Virtual columns differ from normal columns in the following ways:
- They are not selected when using the asterisk (`SELECT *`).
- Virtual columns are not shown in `SHOW CREATE TABLE` and `DESC TABLE` queries.
The `Merge` type table contains a virtual `_table` column of the `String` type. (If the table already has a `_table` column, the virtual column is called `_table1`; if you already have `_table1`, it's called `_table2`, and so on.) It contains the name of the table that data was read from.
The `Merge` type table contains the virtual column `_table` of the type `String`. It contains the name of the table that data was read from. If any underlying table already has the column `_table`, then the virtual column is shadowed and is not accessible.
<!-- TODO: what if underlying tables have different set of columns? -->
If the `WHERE/PREWHERE` clause contains conditions for the `_table` column that do not depend on other table columns (as one of the conjunction elements, or as an entire expression), these conditions are used as an index. The conditions are performed on a data set of table names to read data from, and the read operation will be performed from only those tables that the condition was triggered on.