Add buffer callback to fill in virtual columns

This commit is contained in:
Ivan Lezhankin 2019-05-23 14:15:18 +03:00
parent dd906eabdc
commit 800854119e
19 changed files with 67 additions and 13 deletions

View File

@ -12,7 +12,7 @@ namespace DB
class OneBlockInputStream : public IBlockInputStream
{
public:
OneBlockInputStream(const Block & block_) : block(block_) {}
explicit OneBlockInputStream(const Block & block_) : block(block_) {}
String getName() const override { return "One"; }

View File

@ -65,6 +65,7 @@ void registerInputFormatRowBinary(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::BufferCallback /* callback */,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
@ -78,6 +79,7 @@ void registerInputFormatRowBinary(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::BufferCallback /* callback */,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(

View File

@ -479,6 +479,7 @@ void registerInputFormatCSV(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::BufferCallback /* callback */,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(

View File

@ -307,6 +307,7 @@ void registerInputFormatCapnProto(FormatFactory & factory)
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::BufferCallback /* callback */,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(

View File

@ -27,7 +27,14 @@ const FormatFactory::Creators & FormatFactory::getCreators(const String & name)
}
BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & buf, const Block & sample, const Context & context, UInt64 max_block_size, UInt64 rows_portion_size) const
BlockInputStreamPtr FormatFactory::getInput(
const String & name,
ReadBuffer & buf,
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
BufferCallback callback) const
{
const auto & input_getter = getCreators(name).first;
if (!input_getter)
@ -47,7 +54,8 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
return input_getter(buf, sample, context, max_block_size, rows_portion_size, format_settings);
return input_getter(
buf, sample, context, max_block_size, rows_portion_size, callback ? callback : [] {}, format_settings);
}

View File

@ -24,6 +24,9 @@ class WriteBuffer;
*/
class FormatFactory final : public ext::singleton<FormatFactory>
{
public:
using BufferCallback = std::function<void()>;
private:
using InputCreator = std::function<BlockInputStreamPtr(
ReadBuffer & buf,
@ -31,6 +34,7 @@ private:
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
BufferCallback callback,
const FormatSettings & settings)>;
using OutputCreator = std::function<BlockOutputStreamPtr(
@ -44,8 +48,14 @@ private:
using FormatsDictionary = std::unordered_map<String, Creators>;
public:
BlockInputStreamPtr getInput(const String & name, ReadBuffer & buf,
const Block & sample, const Context & context, UInt64 max_block_size, UInt64 rows_portion_size = 0) const;
BlockInputStreamPtr getInput(
const String & name,
ReadBuffer & buf,
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size = 0,
BufferCallback callback = {}) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context) const;

View File

@ -260,6 +260,7 @@ void registerInputFormatJSONEachRow(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::BufferCallback /* callback */,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(

View File

@ -14,6 +14,7 @@ void registerInputFormatNative(FormatFactory & factory)
const Context &,
UInt64 /* max_block_size */,
UInt64 /* min_read_rows */,
FormatFactory::BufferCallback /* callback */,
const FormatSettings &)
{
return std::make_shared<NativeBlockInputStream>(buf, sample, 0);

View File

@ -477,6 +477,7 @@ void registerInputFormatParquet(FormatFactory & factory)
const Context & context,
UInt64 /* max_block_size */,
UInt64 /* rows_portion_size */,
FormatFactory::BufferCallback /* callback */,
const FormatSettings & /* settings */) { return std::make_shared<ParquetBlockInputStream>(buf, sample, context); });
}

View File

@ -73,6 +73,7 @@ void registerInputFormatProtobuf(FormatFactory & factory)
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::BufferCallback /* callback */,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(

View File

@ -199,6 +199,7 @@ void registerInputFormatTSKV(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::BufferCallback /* callback */,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(

View File

@ -457,6 +457,7 @@ void registerInputFormatTabSeparated(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::BufferCallback /* callback */,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
@ -473,6 +474,7 @@ void registerInputFormatTabSeparated(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::BufferCallback /* callback */,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
@ -489,6 +491,7 @@ void registerInputFormatTabSeparated(FormatFactory & factory)
const Context &,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::BufferCallback /* callback */,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(

View File

@ -156,6 +156,7 @@ void registerInputFormatValues(FormatFactory & factory)
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
FormatFactory::BufferCallback /* callback */,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(

View File

@ -75,8 +75,10 @@ void collectSourceColumns(const ASTSelectQuery * select_query, StoragePtr storag
if (select_query)
{
const auto & storage_aliases = storage->getColumns().getAliasesAndVirtuals();
const auto & storage_aliases = storage->getColumns().getAliases();
const auto & storage_virtuals = storage->getColumns().getVirtuals();
source_columns.insert(source_columns.end(), storage_aliases.begin(), storage_aliases.end());
source_columns.insert(source_columns.end(), storage_virtuals.begin(), storage_virtuals.end());
}
}
}

View File

@ -246,15 +246,24 @@ NamesAndTypesList ColumnsDescription::getMaterialized() const
return ret;
}
NamesAndTypesList ColumnsDescription::getAliasesAndVirtuals() const
NamesAndTypesList ColumnsDescription::getAliases() const
{
NamesAndTypesList ret;
for (const auto & col : columns)
if (col.default_desc.kind == ColumnDefaultKind::Alias || col.is_virtual)
if (col.default_desc.kind == ColumnDefaultKind::Alias)
ret.emplace_back(col.name, col.type);
return ret;
}
NamesAndTypesList ColumnsDescription::getVirtuals() const
{
NamesAndTypesList result;
for (const auto & column : columns)
if (column.is_virtual)
result.emplace_back(column.name, column.type);
return result;
}
NamesAndTypesList ColumnsDescription::getAll() const
{
NamesAndTypesList ret;

View File

@ -67,7 +67,8 @@ public:
NamesAndTypesList getOrdinary() const;
NamesAndTypesList getMaterialized() const;
NamesAndTypesList getAliasesAndVirtuals() const;
NamesAndTypesList getAliases() const;
NamesAndTypesList getVirtuals() const;
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.
NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + virtuals.

View File

@ -16,6 +16,8 @@ KafkaBlockInputStream::KafkaBlockInputStream(
if (!storage.getSchemaName().empty())
context.setSetting("format_schema", storage.getSchemaName());
virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset"}).cloneEmptyColumns();
}
KafkaBlockInputStream::~KafkaBlockInputStream()
@ -50,8 +52,16 @@ void KafkaBlockInputStream::readPrefixImpl()
rows_portion_size = std::max(rows_portion_size, 1ul);
auto non_virtual_header = storage.getSampleBlockNonMaterialized(); /// FIXME: add materialized columns support
auto buffer_callback = [this]
{
const auto * sub_buffer = buffer->subBufferAs<ReadBufferFromKafkaConsumer>();
virtual_columns[0]->insert(sub_buffer->currentTopic()); // "topic"
virtual_columns[1]->insert(sub_buffer->currentKey()); // "key"
virtual_columns[2]->insert(sub_buffer->currentOffset()); // "offset"
};
auto child = FormatFactory::instance().getInput(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, rows_portion_size);
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, rows_portion_size, buffer_callback);
child->setLimits(limits);
addChild(child);

View File

@ -28,6 +28,7 @@ private:
UInt64 max_block_size;
BufferPtr buffer;
MutableColumns virtual_columns;
bool broken = true, claimed = false;
};

View File

@ -35,9 +35,9 @@ public:
auto pollTimeout() { return poll_timeout; }
// Return values for the message that's being read.
String currentTopic() { return current[-1].get_topic(); }
String currentKey() { return current[-1].get_key(); }
auto currentOffset() { return current[-1].get_offset(); }
String currentTopic() const { return current[-1].get_topic(); }
String currentKey() const { return current[-1].get_key(); }
auto currentOffset() const { return current[-1].get_offset(); }
private:
using Messages = std::vector<cppkafka::Message>;