mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
commit
cf194edfaf
@ -263,6 +263,11 @@ protected:
|
||||
*/
|
||||
bool checkTimeLimit();
|
||||
|
||||
#ifndef NDEBUG
|
||||
bool read_prefix_is_called = false;
|
||||
bool read_suffix_is_called = false;
|
||||
#endif
|
||||
|
||||
private:
|
||||
bool enabled_extremes = false;
|
||||
|
||||
@ -315,10 +320,6 @@ private:
|
||||
return;
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
bool read_prefix_is_called = false;
|
||||
bool read_suffix_is_called = false;
|
||||
#endif
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -57,12 +57,19 @@ NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server
|
||||
}
|
||||
}
|
||||
|
||||
// also resets few vars from IBlockInputStream (I didn't want to propagate resetParser upthere)
|
||||
void NativeBlockInputStream::resetParser()
|
||||
{
|
||||
istr_concrete = nullptr;
|
||||
use_index = false;
|
||||
header.clear();
|
||||
avg_value_size_hints.clear();
|
||||
|
||||
#ifndef NDEBUG
|
||||
read_prefix_is_called = false;
|
||||
read_suffix_is_called = false;
|
||||
#endif
|
||||
|
||||
is_cancelled.store(false);
|
||||
is_killed.store(false);
|
||||
}
|
||||
|
||||
void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
|
||||
|
@ -6,7 +6,6 @@
|
||||
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
|
||||
#include <Processors/Formats/InputStreamFromInputFormat.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
KafkaBlockInputStream::KafkaBlockInputStream(
|
||||
@ -66,20 +65,8 @@ Block KafkaBlockInputStream::readImpl()
|
||||
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
|
||||
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
|
||||
|
||||
auto read_callback = [&]
|
||||
{
|
||||
virtual_columns[0]->insert(buffer->currentTopic()); // "topic"
|
||||
virtual_columns[1]->insert(buffer->currentKey()); // "key"
|
||||
virtual_columns[2]->insert(buffer->currentOffset()); // "offset"
|
||||
virtual_columns[3]->insert(buffer->currentPartition()); // "partition"
|
||||
|
||||
auto timestamp = buffer->currentTimestamp();
|
||||
if (timestamp)
|
||||
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(timestamp->get_timestamp()).count()); // "timestamp"
|
||||
};
|
||||
|
||||
auto input_format = FormatFactory::instance().getInputFormat(
|
||||
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, read_callback);
|
||||
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
|
||||
|
||||
InputPort port(input_format->getPort().getHeader(), input_format.get());
|
||||
connect(input_format->getPort(), port);
|
||||
@ -106,13 +93,17 @@ Block KafkaBlockInputStream::readImpl()
|
||||
case IProcessor::Status::PortFull:
|
||||
{
|
||||
auto chunk = port.pull();
|
||||
new_rows = new_rows + chunk.getNumRows();
|
||||
|
||||
/// FIXME: materialize MATERIALIZED columns here.
|
||||
// that was returning bad value before https://github.com/ClickHouse/ClickHouse/pull/8005
|
||||
// if will be backported should go together with #8005
|
||||
auto chunk_rows = chunk.getNumRows();
|
||||
new_rows += chunk_rows;
|
||||
|
||||
auto columns = chunk.detachColumns();
|
||||
for (size_t i = 0, s = columns.size(); i < s; ++i)
|
||||
{
|
||||
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
|
||||
}
|
||||
break;
|
||||
}
|
||||
case IProcessor::Status::NeedData:
|
||||
@ -125,18 +116,55 @@ Block KafkaBlockInputStream::readImpl()
|
||||
};
|
||||
|
||||
size_t total_rows = 0;
|
||||
while (total_rows < max_block_size)
|
||||
|
||||
while (true)
|
||||
{
|
||||
// some formats (like RowBinaryWithNamesAndTypes / CSVWithNames)
|
||||
// throw an exception from readPrefix when buffer in empty
|
||||
if (buffer->eof())
|
||||
break;
|
||||
|
||||
auto new_rows = read_kafka_message();
|
||||
|
||||
auto _topic = buffer->currentTopic();
|
||||
auto _key = buffer->currentKey();
|
||||
auto _offset = buffer->currentOffset();
|
||||
auto _partition = buffer->currentPartition();
|
||||
auto _timestamp_raw = buffer->currentTimestamp();
|
||||
auto _timestamp = _timestamp_raw ? std::chrono::duration_cast<std::chrono::seconds>(_timestamp_raw->get_timestamp()).count()
|
||||
: 0;
|
||||
for (size_t i = 0; i < new_rows; ++i)
|
||||
{
|
||||
virtual_columns[0]->insert(_topic);
|
||||
virtual_columns[1]->insert(_key);
|
||||
virtual_columns[2]->insert(_offset);
|
||||
virtual_columns[3]->insert(_partition);
|
||||
if (_timestamp_raw)
|
||||
{
|
||||
virtual_columns[4]->insert(_timestamp);
|
||||
}
|
||||
else
|
||||
{
|
||||
virtual_columns[4]->insertDefault();
|
||||
}
|
||||
}
|
||||
|
||||
total_rows = total_rows + new_rows;
|
||||
buffer->allowNext();
|
||||
if (!new_rows || !checkTimeLimit())
|
||||
if (!new_rows || total_rows >= max_block_size || !checkTimeLimit())
|
||||
break;
|
||||
}
|
||||
|
||||
if (total_rows == 0)
|
||||
return Block();
|
||||
|
||||
/// MATERIALIZED columns can be added here, but I think
|
||||
// they are not needed here:
|
||||
// and it's misleading to use them here,
|
||||
// as columns 'materialized' that way stays 'ephemeral'
|
||||
// i.e. will not be stored anythere
|
||||
// If needed any extra columns can be added using DEFAULT they can be added at MV level if needed.
|
||||
|
||||
auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
|
||||
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user