[WIP] refactoring

This commit is contained in:
Ivan Lezhankin 2019-05-22 22:38:43 +03:00
parent cf39c4cc47
commit dd906eabdc
8 changed files with 108 additions and 83 deletions

View File

@ -1,8 +1,8 @@
#pragma once
#include <cstdint>
#include <string>
#include <vector>
#include <cstdint>
namespace DB

View File

@ -68,8 +68,8 @@ public:
NamesAndTypesList getOrdinary() const;
NamesAndTypesList getMaterialized() const;
NamesAndTypesList getAliasesAndVirtuals() const;
/// ordinary + materialized + aliases + virtuals.
NamesAndTypesList getAll() const;
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.
NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + virtuals.
using ColumnTTLs = std::unordered_map<String, ASTPtr>;
ColumnTTLs getColumnTTLs() const;
@ -88,8 +88,6 @@ public:
throw Exception("Cannot modify ColumnDescription for column " + column_name + ": column name cannot be changed", ErrorCodes::LOGICAL_ERROR);
}
/// ordinary + materialized.
NamesAndTypesList getAllPhysical() const;
Names getNamesOfPhysical() const;
bool hasPhysical(const String & column_name) const;
NameAndTypePair getPhysical(const String & column_name) const;

View File

@ -91,9 +91,9 @@ public: /// thread-unsafe part. lockStructure must be acquired
virtual NameAndTypePair getColumn(const String & column_name) const;
virtual bool hasColumn(const String & column_name) const;
Block getSampleBlock() const;
Block getSampleBlockNonMaterialized() const;
Block getSampleBlockForColumns(const Names & column_names) const; /// including virtual and alias columns.
Block getSampleBlock() const; /// ordinary + materialized.
Block getSampleBlockNonMaterialized() const; /// ordinary.
Block getSampleBlockForColumns(const Names & column_names) const; /// ordinary + materialized + aliases + virtuals.
/// Verify that all the requested names are in the table and are set correctly:
/// list of names is not empty and the names do not repeat.

View File

@ -7,15 +7,15 @@ namespace DB
{
KafkaBlockInputStream::KafkaBlockInputStream(
StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_)
: storage(storage_), context(context_), max_block_size(max_block_size_)
StorageKafka & storage_, const Context & context_, const Names & columns, size_t max_block_size_)
: storage(storage_), context(context_), column_names(columns), max_block_size(max_block_size_)
{
context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV)
context.setSetting("input_format_allow_errors_ratio", 0.);
context.setSetting("input_format_allow_errors_num", storage.skip_broken);
context.setSetting("input_format_allow_errors_num", storage.skipBroken());
if (!schema.empty())
context.setSetting("format_schema", schema);
if (!storage.getSchemaName().empty())
context.setSetting("format_schema", storage.getSchemaName());
}
KafkaBlockInputStream::~KafkaBlockInputStream()
@ -29,6 +29,11 @@ KafkaBlockInputStream::~KafkaBlockInputStream()
storage.pushBuffer(buffer);
}
Block KafkaBlockInputStream::getHeader() const
{
return storage.getSampleBlockForColumns(column_names);
}
void KafkaBlockInputStream::readPrefixImpl()
{
buffer = storage.tryClaimBuffer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds());
@ -37,20 +42,30 @@ void KafkaBlockInputStream::readPrefixImpl()
if (!buffer)
buffer = storage.createBuffer();
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->subscribe(storage.topics);
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->subscribe(storage.getTopics());
const auto & limits = getLimits();
const size_t poll_timeout = buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->pollTimeout();
size_t rows_portion_size = poll_timeout ? std::min(max_block_size, limits.max_execution_time.totalMilliseconds() / poll_timeout) : max_block_size;
rows_portion_size = std::max(rows_portion_size, 1ul);
auto child = FormatFactory::instance().getInput(storage.format_name, *buffer, storage.getSampleBlock(), context, max_block_size, rows_portion_size);
auto non_virtual_header = storage.getSampleBlockNonMaterialized(); /// FIXME: add materialized columns support
auto child = FormatFactory::instance().getInput(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, rows_portion_size);
child->setLimits(limits);
addChild(child);
broken = true;
}
Block KafkaBlockInputStream::readImpl()
{
/// FIXME: materialize MATERIALIZED columns here.
Block block = children.back()->read();
/// TODO: add virtual columns here
return block;
}
void KafkaBlockInputStream::readSuffixImpl()
{
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->commit();

View File

@ -11,19 +11,20 @@ namespace DB
class KafkaBlockInputStream : public IBlockInputStream
{
public:
KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_);
KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const Names & columns, size_t max_block_size_);
~KafkaBlockInputStream() override;
String getName() const override { return storage.getName(); }
Block readImpl() override { return children.back()->read(); }
Block getHeader() const override { return storage.getSampleBlock(); }
Block getHeader() const override;
void readPrefixImpl() override;
Block readImpl() override;
void readSuffixImpl() override;
private:
StorageKafka & storage;
Context context;
Names column_names;
UInt64 max_block_size;
BufferPtr buffer;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/Names.h>
#include <Core/Types.h>
#include <IO/DelimitedReadBuffer.h>
#include <common/logger_useful.h>
@ -33,6 +34,11 @@ public:
auto pollTimeout() { return poll_timeout; }
// Return values for the message that's being read.
String currentTopic() { return current[-1].get_topic(); }
String currentKey() { return current[-1].get_key(); }
auto currentOffset() { return current[-1].get_offset(); }
private:
using Messages = std::vector<cppkafka::Message>;

View File

@ -108,7 +108,7 @@ StorageKafka::StorageKafka(
BlockInputStreams StorageKafka::read(
const Names & /* column_names */,
const Names & column_names,
const SelectQueryInfo & /* query_info */,
const Context & context,
QueryProcessingStage::Enum /* processed_stage */,
@ -127,8 +127,8 @@ BlockInputStreams StorageKafka::read(
for (size_t i = 0; i < stream_count; ++i)
{
/// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block
/// TODO: that leads to awful performance.
streams.emplace_back(std::make_shared<KafkaBlockInputStream>(*this, context, schema_name, 1));
/// TODO: probably that leads to awful performance.
streams.emplace_back(std::make_shared<KafkaBlockInputStream>(*this, context, column_names, 1));
}
LOG_DEBUG(log, "Starting reading " << streams.size() << " streams");
@ -182,46 +182,6 @@ void StorageKafka::updateDependencies()
}
cppkafka::Configuration StorageKafka::createConsumerConfiguration()
{
cppkafka::Configuration conf;
LOG_TRACE(log, "Setting brokers: " << brokers);
conf.set("metadata.broker.list", brokers);
LOG_TRACE(log, "Setting Group ID: " << group << " Client ID: clickhouse");
conf.set("group.id", group);
conf.set("client.id", VERSION_FULL);
// If no offset stored for this group, read all messages from the start
conf.set("auto.offset.reset", "smallest");
// We manually commit offsets after a stream successfully finished
conf.set("enable.auto.commit", "false");
// Ignore EOF messages
conf.set("enable.partition.eof", "false");
// for debug logs inside rdkafka
// conf.set("debug", "consumer,cgrp,topic,fetch");
// Update consumer configuration from the configuration
const auto & config = global_context.getConfigRef();
if (config.has(CONFIG_PREFIX))
loadFromConfig(conf, config, CONFIG_PREFIX);
// Update consumer topic-specific configuration
for (const auto & topic : topics)
{
const auto topic_config_key = CONFIG_PREFIX + "_" + topic;
if (config.has(topic_config_key))
loadFromConfig(conf, config, topic_config_key);
}
return conf;
}
BufferPtr StorageKafka::createBuffer()
{
// Create a consumer and subscribe to topics
@ -269,6 +229,47 @@ void StorageKafka::pushBuffer(BufferPtr buffer)
semaphore.set();
}
cppkafka::Configuration StorageKafka::createConsumerConfiguration()
{
cppkafka::Configuration conf;
LOG_TRACE(log, "Setting brokers: " << brokers);
conf.set("metadata.broker.list", brokers);
LOG_TRACE(log, "Setting Group ID: " << group << " Client ID: clickhouse");
conf.set("group.id", group);
conf.set("client.id", VERSION_FULL);
// If no offset stored for this group, read all messages from the start
conf.set("auto.offset.reset", "smallest");
// We manually commit offsets after a stream successfully finished
conf.set("enable.auto.commit", "false");
// Ignore EOF messages
conf.set("enable.partition.eof", "false");
// for debug logs inside rdkafka
// conf.set("debug", "consumer,cgrp,topic,fetch");
// Update consumer configuration from the configuration
const auto & config = global_context.getConfigRef();
if (config.has(CONFIG_PREFIX))
loadFromConfig(conf, config, CONFIG_PREFIX);
// Update consumer topic-specific configuration
for (const auto & topic : topics)
{
const auto topic_config_key = CONFIG_PREFIX + "_" + topic;
if (config.has(topic_config_key))
loadFromConfig(conf, config, topic_config_key);
}
return conf;
}
bool StorageKafka::checkDependencies(const String & current_database_name, const String & current_table_name)
{
// Check if all dependencies are attached
@ -344,12 +345,16 @@ bool StorageKafka::streamToViews()
if (block_size == 0)
block_size = settings.max_block_size.value;
// Execute the query
InterpreterInsertQuery interpreter{insert, global_context};
auto block_io = interpreter.execute();
// Create a stream for each consumer and join them in a union stream
BlockInputStreams streams;
streams.reserve(num_created_consumers);
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto stream = std::make_shared<KafkaBlockInputStream>(*this, global_context, schema_name, block_size);
auto stream = std::make_shared<KafkaBlockInputStream>(*this, global_context, block_io.out->getHeader().getNames(), block_size);
streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL
@ -366,9 +371,6 @@ bool StorageKafka::streamToViews()
else
in = streams[0];
// Execute the query
InterpreterInsertQuery interpreter{insert, global_context};
auto block_io = interpreter.execute();
copyData(*in, *block_io.out, &stream_cancelled);
// Check whether the limits were applied during query execution

View File

@ -20,9 +20,6 @@ namespace DB
*/
class StorageKafka : public ext::shared_ptr_helper<StorageKafka>, public IStorage
{
friend class KafkaBlockInputStream;
friend class KafkaBlockOutputStream;
public:
std::string getName() const override { return "Kafka"; }
std::string getTableName() const override { return table_name; }
@ -43,6 +40,27 @@ public:
void updateDependencies() override;
BufferPtr createBuffer();
BufferPtr claimBuffer();
BufferPtr tryClaimBuffer(long wait_ms);
void pushBuffer(BufferPtr buf);
const auto & getTopics() const { return topics; }
const auto & getFormatName() const { return format_name; }
const auto & getSchemaName() const { return schema_name; }
const auto & skipBroken() const { return skip_broken; }
protected:
StorageKafka(
const std::string & table_name_,
const std::string & database_name_,
Context & context_,
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, char row_delimiter_, const String & schema_name_,
size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken,
bool intermediate_commit_);
private:
// Configuration and state
String table_name;
@ -77,25 +95,10 @@ private:
std::atomic<bool> stream_cancelled{false};
cppkafka::Configuration createConsumerConfiguration();
BufferPtr createBuffer();
BufferPtr claimBuffer();
BufferPtr tryClaimBuffer(long wait_ms);
void pushBuffer(BufferPtr buf);
void streamThread();
bool streamToViews();
bool checkDependencies(const String & database_name, const String & table_name);
protected:
StorageKafka(
const std::string & table_name_,
const std::string & database_name_,
Context & context_,
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, char row_delimiter_, const String & schema_name_,
size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken,
bool intermediate_commit_);
};
}