execute asynchronous inserts separatly for each client

This commit is contained in:
Anton Popov 2021-08-27 06:00:12 +03:00
parent 6d3274c22c
commit e8ac8e3454
35 changed files with 489 additions and 477 deletions

View File

@ -941,6 +941,7 @@ if (ThreadFuzzer::instance().isEffective())
if (settings.async_insert_threads)
global_context->setAsynchronousInsertQueue(std::make_shared<AsynchronousInsertQueue>(
global_context,
settings.async_insert_threads,
settings.async_insert_max_data_size,
AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout, .stale = settings.async_insert_stale_timeout}));

View File

@ -5,13 +5,16 @@
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Context.h>
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromString.h>
#include <IO/copyData.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/IStorage.h>
namespace DB
@ -19,15 +22,13 @@ namespace DB
struct AsynchronousInsertQueue::InsertData
{
InsertData(ASTPtr query_, const Settings & settings_, const Block & header_)
: query(std::move(query_)), settings(settings_), header(header_)
InsertData(ASTPtr query_, const Settings & settings_)
: query(std::move(query_)), settings(settings_)
{
}
ASTPtr query;
Settings settings;
Block header;
String query_id;
struct Data
{
@ -120,47 +121,17 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
pool.wait();
}
bool AsynchronousInsertQueue::push(
const ASTPtr & query, const Settings & settings, const String & query_id)
{
auto write_lock = lock->getLock(RWLockImpl::Write, String());
InsertQuery key{query, settings};
auto it = queue->find(key);
if (it != queue->end())
{
std::unique_lock<std::mutex> data_lock(it->second->mutex);
if (it->second->is_reset)
return false;
pushImpl(query, query_id, it);
return true;
}
return false;
}
void AsynchronousInsertQueue::push(
const ASTPtr & query, const Settings & settings, const String & query_id, const Block & header)
void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settings, const String & query_id)
{
auto write_lock = lock->getLock(RWLockImpl::Write, String());
InsertQuery key{query, settings};
auto it = queue->find(key);
if (it == queue->end())
it = queue->emplace(key, std::make_shared<InsertData>(query, settings, header)).first;
it = queue->emplace(key, std::make_shared<InsertData>(query, settings)).first;
else if (it->second->is_reset)
it->second = std::make_shared<InsertData>(query, settings, header);
else
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Entry for query '{}' already exists and not reset", queryToString(query));
it->second = std::make_shared<InsertData>(query, settings);
std::unique_lock<std::mutex> data_lock(it->second->mutex);
pushImpl(query, query_id, it);
}
void AsynchronousInsertQueue::pushImpl(const ASTPtr & query, const String & query_id, QueueIterator it)
{
auto read_buf = getReadBufferFromASTInsertQuery(query);
/// It's important to read the whole data per query as a single chunk, so we can safely drop it in case of parsing failure.
@ -238,27 +209,7 @@ try
if (data->is_reset)
return;
// ReadBuffers read_buffers;
// for (const auto & datum : data->data)
// read_buffers.emplace_back(std::make_unique<ReadBufferFromString>(datum));
// auto insert_context = Context::createCopy(global_context);
// insert_context->makeQueryContext();
// insert_context->setSettings(data->settings);
// InterpreterInsertQuery interpreter(data->query, std::move(read_buffers), insert_context);
// auto io = interpreter.execute();
// assert(io.pipeline.initialized());
// auto log_progress = [&](const Progress & progress)
// {
// LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"),
// "Flushed {} rows, {} bytes", progress.written_rows, progress.written_bytes);
// };
// io.pipeline.setProgressCallback(log_progress);
// auto executor = io.pipeline.execute();
// executor->execute(io.pipeline.getNumThreads());
const auto * log = &Poco::Logger::get("AsynchronousInsertQueue");
auto insert_context = Context::createCopy(global_context);
/// 'resetParser' doesn't work for parallel parsing.
@ -266,56 +217,60 @@ try
insert_context->makeQueryContext();
insert_context->setSettings(data->settings);
auto [format, pipe] = getSourceFromASTInsertQuery(data->query, false, data->header, insert_context, nullptr);
InterpreterInsertQuery interpreter(data->query, insert_context, data->settings.insert_allow_materialized_columns, false);
auto sinks = interpreter.getSinks();
assert(sinks.size() == 1);
auto header = sinks.at(0)->getInputs().front().getHeader();
auto format = getInputFormatFromASTInsertQuery(data->query, false, header, insert_context, nullptr);
MutableColumns input_columns = data->header.cloneEmptyColumns();
size_t total_rows = 0;
for (const auto & datum : data->data)
std::string_view current_query_id;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
{
ReadBufferFromString buf(datum.bytes);
format->setReadBuffer(buf);
size_t current_rows = 0;
LOG_ERROR(&Poco::Logger::get("AsynchronousInsertQueue"),
"Failed parsing for query '{}' with query id {}. {}",
queryToString(data->query), current_query_id, e.displayText());
try
{
Chunk chunk;
QueryPipeline pipeline;
pipeline.init(std::move(pipe));
PullingPipelineExecutor executor(pipeline);
while (executor.pull(chunk))
{
assert(chunk.getNumColumns() == input_chunk.getNumColumns());
const auto & columns = chunk.getColumns();
for (size_t i = 0; i < chunk.getNumColumns(); ++i)
input_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
for (const auto & column : result_columns)
if (column->size() > total_rows)
column->popBack(column->size() - total_rows);
current_rows += chunk.getNumRows();
}
}
catch (const Exception & e)
{
LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"),
"Failed query '{}' with query id {}. {}",
queryToString(data->query), datum.query_id, e.displayText());
return 0;
};
for (const auto & column : input_columns)
if (column->size() > total_rows)
column->popBack(column->size() - total_rows);
StreamingFormatExecutor executor(header, format, std::move(on_error));
continue;
}
LOG_INFO(&Poco::Logger::get("AsynchronousInsertQueue"),
"Succeded query '{}' with query id {}",
queryToString(data->query), datum.query_id);
std::vector<std::pair<std::string_view, std::unique_ptr<ReadBuffer>>> prepared_data;
prepared_data.reserve(data->data.size());
for (const auto & datum : data->data)
prepared_data.emplace_back(datum.query_id, std::make_unique<ReadBufferFromString>(datum.bytes));
for (const auto & [query_id, buffer] : prepared_data)
{
format->resetParser();
total_rows += current_rows;
format->setReadBuffer(*buffer);
current_query_id = query_id;
total_rows += executor.execute();
}
std::cerr << "should be inserted query: " << queryToString(data->query) << "\n";
auto block = data->header.cloneWithColumns(std::move(input_columns));
std::cerr << "insert block: " << block.dumpStructure() << "\n";
auto chunk = Chunk(executor.getResultColumns(), total_rows);
size_t total_bytes = chunk.bytes();
auto source = std::make_shared<SourceFromSingleChunk>(header, std::move(chunk));
Pipe pipe(source);
QueryPipeline out_pipeline;
out_pipeline.init(std::move(pipe));
out_pipeline.resize(1);
out_pipeline.setSinks([&](const Block &, Pipe::StreamType) { return sinks.at(0); });
auto out_executor = out_pipeline.execute();
out_executor->execute(out_pipeline.getNumThreads());
LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'",
total_rows, total_bytes, queryToString(data->query));
data->reset();
}

View File

@ -26,8 +26,7 @@ class AsynchronousInsertQueue : public WithContext
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts);
~AsynchronousInsertQueue();
bool push(const ASTPtr & query, const Settings & settings, const String & query_id);
void push(const ASTPtr & query, const Settings & settings, const String & query_id, const Block & header);
void push(const ASTPtr & query, const Settings & settings, const String & query_id);
private:

View File

@ -147,26 +147,20 @@ static bool isTrivialSelect(const ASTPtr & select)
};
BlockIO InterpreterInsertQuery::execute()
std::pair<BlockIO, Processors> InterpreterInsertQuery::executeImpl(
const StoragePtr & table, Block & sample_block)
{
const Settings & settings = getContext()->getSettingsRef();
auto & query = query_ptr->as<ASTInsertQuery &>();
const auto & settings = getContext()->getSettingsRef();
const auto & query = query_ptr->as<const ASTInsertQuery &>();
auto metadata_snapshot = table->getInMemoryMetadataPtr();
BlockIO res;
StoragePtr table = getTable(query);
if (query.partition_by && !table->supportsPartitionBy())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "PARTITION BY clause is not supported by storage");
auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings.lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
auto query_sample_block = getSampleBlock(query, table, metadata_snapshot);
if (!query.table_function)
getContext()->checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames());
BlockIO res;
Processors sinks;
bool is_distributed_insert_select = false;
if (query.select && table->isRemote() && settings.parallel_distributed_insert_select)
{
// Distributed INSERT SELECT
@ -177,7 +171,6 @@ BlockIO InterpreterInsertQuery::execute()
}
}
BlockOutputStreams out_streams;
if (!is_distributed_insert_select || query.watch)
{
size_t out_streams_size = 1;
@ -245,7 +238,7 @@ BlockIO InterpreterInsertQuery::execute()
if (getContext()->getSettingsRef().insert_null_as_default)
{
const auto & input_columns = res.pipeline.getHeader().getColumnsWithTypeAndName();
const auto & query_columns = query_sample_block.getColumnsWithTypeAndName();
const auto & query_columns = sample_block.getColumnsWithTypeAndName();
const auto & output_columns = metadata_snapshot->getColumns();
if (input_columns.size() == query_columns.size())
@ -255,7 +248,7 @@ BlockIO InterpreterInsertQuery::execute()
/// Change query sample block columns to Nullable to allow inserting nullable columns, where NULL values will be substituted with
/// default column values (in AddingDefaultBlockOutputStream), so all values will be cast correctly.
if (input_columns[col_idx].type->isNullable() && !query_columns[col_idx].type->isNullable() && output_columns.hasDefault(query_columns[col_idx].name))
query_sample_block.setColumn(col_idx, ColumnWithTypeAndName(makeNullable(query_columns[col_idx].column), makeNullable(query_columns[col_idx].type), query_columns[col_idx].name));
sample_block.setColumn(col_idx, ColumnWithTypeAndName(makeNullable(query_columns[col_idx].column), makeNullable(query_columns[col_idx].type), query_columns[col_idx].name));
}
}
}
@ -290,7 +283,7 @@ BlockIO InterpreterInsertQuery::execute()
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, metadata_snapshot->getColumns(), getContext(), null_as_default);
out, sample_block, metadata_snapshot->getColumns(), getContext(), null_as_default);
/// It's important to squash blocks as early as possible (before other transforms),
/// because other transforms may work inefficient if block size is small.
@ -310,12 +303,32 @@ BlockIO InterpreterInsertQuery::execute()
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
out_wrapper->setProcessListElement(getContext()->getProcessListElement());
out_streams.emplace_back(std::move(out_wrapper));
sinks.emplace_back(std::make_shared<SinkToOutputStream>(std::move(out_wrapper)));
}
}
return {std::move(res), std::move(sinks)};
}
BlockIO InterpreterInsertQuery::execute()
{
const auto & settings = getContext()->getSettingsRef();
auto & query = query_ptr->as<ASTInsertQuery &>();
auto table = getTable(query);
auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings.lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
auto sample_block = getSampleBlock(query, table, metadata_snapshot);
if (!query.table_function)
getContext()->checkAccess(AccessType::INSERT, query.table_id, sample_block.getNames());
BlockIO res;
Processors sinks;
std::tie(res, sinks) = executeImpl(table, sample_block);
/// What type of query: INSERT or INSERT SELECT or INSERT WATCH?
if (is_distributed_insert_select)
if (sinks.empty())
{
/// Pipeline was already built.
}
@ -323,7 +336,7 @@ BlockIO InterpreterInsertQuery::execute()
{
/// XXX: is this branch also triggered for select+input() case?
const auto & header = out_streams.at(0)->getHeader();
const auto & header = sinks.at(0)->getInputs().front().getHeader();
auto actions_dag = ActionsDAG::makeConvertingActions(
res.pipeline.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
@ -335,15 +348,13 @@ BlockIO InterpreterInsertQuery::execute()
return std::make_shared<ExpressionTransform>(in_header, actions);
});
res.pipeline.setSinks([&](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr
auto it = sinks.rbegin();
res.pipeline.setSinks([&it](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr
{
if (type != QueryPipeline::StreamType::Main)
return nullptr;
auto stream = std::move(out_streams.back());
out_streams.pop_back();
return std::make_shared<SinkToOutputStream>(std::move(stream));
return *it++;
});
if (!allow_materialized)
@ -353,18 +364,16 @@ BlockIO InterpreterInsertQuery::execute()
throw Exception("Cannot insert column " + column.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
}
}
else if (!query.expectNativeData())
else
{
auto pipe = getSourceFromASTInsertQuery(query_ptr, true, query_sample_block, getContext(), nullptr).second;
auto pipe = getSourceFromASTInsertQuery(query_ptr, true, sample_block, getContext(), nullptr);
res.pipeline.init(std::move(pipe));
res.pipeline.resize(1);
res.pipeline.setSinks([&](const Block &, Pipe::StreamType)
{
return std::make_shared<SinkToOutputStream>(out_streams.at(0));
return sinks.at(0);
});
}
else
res.out = std::move(out_streams.at(0));
res.pipeline.addStorageHolder(table);
if (const auto * mv = dynamic_cast<const StorageMaterializedView *>(table.get()))
@ -376,6 +385,21 @@ BlockIO InterpreterInsertQuery::execute()
return res;
}
Processors InterpreterInsertQuery::getSinks()
{
const auto & settings = getContext()->getSettingsRef();
auto & query = query_ptr->as<ASTInsertQuery &>();
auto table = getTable(query);
auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings.lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
auto sample_block = getSampleBlock(query, table, metadata_snapshot);
if (!query.table_function)
getContext()->checkAccess(AccessType::INSERT, query.table_id, sample_block.getNames());
return executeImpl(table, sample_block).second;
}
StorageID InterpreterInsertQuery::getDatabaseTable() const
{

View File

@ -10,6 +10,7 @@
namespace DB
{
/** Interprets the INSERT query.
*/
class InterpreterInsertQuery : public IInterpreter, WithContext
@ -28,7 +29,7 @@ public:
* Or nothing if the request INSERT SELECT (self-sufficient query - does not accept the input data, does not return the result).
*/
BlockIO execute() override;
Processors getSinks();
StorageID getDatabaseTable() const;
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, ContextPtr context_) const override;
@ -36,6 +37,7 @@ public:
private:
StoragePtr getTable(ASTInsertQuery & query);
Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
std::pair<BlockIO, Processors> executeImpl(const StoragePtr & table, Block & sample_block);
ASTPtr query_ptr;
const bool allow_materialized;

View File

@ -23,6 +23,8 @@
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Context.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <IO/WriteHelpers.h>
@ -482,11 +484,19 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
insert_context->makeQueryContext();
InterpreterInsertQuery interpreter(query_ptr, insert_context);
BlockIO io = interpreter.execute();
auto sinks = interpreter.getSinks();
assert(sinks.size() == 1);
io.out->writePrefix();
io.out->write(block);
io.out->writeSuffix();
auto chunk = Chunk(block.getColumns(), block.rows());
auto source = std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), std::move(chunk));
QueryPipeline pipeline;
pipeline.init(Pipe(source));
pipeline.resize(1);
pipeline.setSinks([&](const Block &, Pipe::StreamType) { return sinks.at(0); });
auto executor = pipeline.execute();
executor->execute(pipeline.getNumThreads());
}
catch (...)
{

View File

@ -540,7 +540,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto & input_storage = dynamic_cast<StorageInput &>(*storage);
auto input_metadata_snapshot = input_storage.getInMemoryMetadataPtr();
auto pipe = getSourceFromASTInsertQuery(
ast, true, input_metadata_snapshot->getSampleBlock(), context, input_function).second;
ast, true, input_metadata_snapshot->getSampleBlock(), context, input_function);
input_storage.setPipe(std::move(pipe));
}
}
@ -553,8 +553,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
const bool async_insert
= queue && insert_query && !insert_query->select && !insert_query->expectNativeData() && settings.async_insert_mode;
if (async_insert && queue->push(ast, settings, context->getCurrentQueryId()))
if (async_insert)
{
queue->push(ast, settings, context->getCurrentQueryId());
/// Shortcut for already processed similar insert-queries.
/// Similarity is defined by hashing query text and some settings.
return std::make_tuple(ast, BlockIO());
@ -924,13 +926,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
res.finish_callback = std::move(finish_callback);
res.exception_callback = std::move(exception_callback);
}
if (async_insert)
{
assert(res.pipeline.initialized());
queue->push(ast, settings, context->getCurrentQueryId(), res.pipeline.getHeader());
return std::make_tuple(ast, BlockIO());
}
}
catch (...)
{
@ -1036,7 +1031,7 @@ void executeQuery(
{
if (streams.out)
{
auto pipe = getSourceFromASTInsertQuery(ast, true, streams.out->getHeader(), context, nullptr).second;
auto pipe = getSourceFromASTInsertQuery(ast, true, streams.out->getHeader(), context, nullptr);
pipeline.init(std::move(pipe));
pipeline.resize(1);

View File

@ -0,0 +1,74 @@
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <iostream>
namespace DB
{
StreamingFormatExecutor::StreamingFormatExecutor(
const Block & header_, InputFormatPtr format_, ErrorCallback on_error_)
: header(header_)
, format(std::move(format_))
, on_error(std::move(on_error_))
, port(format->getPort().getHeader(), format.get())
{
connect(format->getPort(), port);
result_columns = header.cloneEmptyColumns();
}
MutableColumns StreamingFormatExecutor::getResultColumns()
{
auto ret_columns = header.cloneEmptyColumns();
std::swap(ret_columns, result_columns);
return ret_columns;
}
size_t StreamingFormatExecutor::execute()
{
try
{
size_t new_rows = 0;
port.setNeeded();
while (true)
{
auto status = format->prepare();
switch (status)
{
case IProcessor::Status::Ready:
format->work();
break;
case IProcessor::Status::Finished:
format->resetParser();
return new_rows;
case IProcessor::Status::PortFull:
{
auto chunk = port.pull();
auto chunk_rows = chunk.getNumRows();
new_rows += chunk_rows;
auto columns = chunk.detachColumns();
for (size_t i = 0, s = columns.size(); i < s; ++i)
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
break;
}
case IProcessor::Status::NeedData:
case IProcessor::Status::Async:
case IProcessor::Status::ExpandPipeline:
throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR);
}
}
}
catch (Exception & e)
{
format->resetParser();
return on_error(result_columns, e);
}
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <Processors/Formats/IInputFormat.h>
namespace DB
{
class StreamingFormatExecutor
{
public:
using ErrorCallback = std::function<size_t(const MutableColumns &, Exception &)>;
StreamingFormatExecutor(
const Block & header_,
InputFormatPtr format_,
ErrorCallback on_error_ = [](const MutableColumns &, Exception &) -> size_t { throw; });
size_t execute();
MutableColumns getResultColumns();
private:
Block header;
InputFormatPtr format;
ErrorCallback on_error;
InputPort port;
MutableColumns result_columns;
};
}

View File

@ -8,15 +8,14 @@ namespace DB
{
IInputFormat::IInputFormat(Block header, ReadBuffer & in_)
: ISource(std::move(header)), in(in_)
: ISource(std::move(header)), in(&in_)
{
column_mapping = std::make_shared<ColumnMapping>();
}
void IInputFormat::resetParser()
{
std::cerr << "resetParser... in: " << demangle(typeid(in).name()) << "\n";
in.ignoreAll();
in->ignoreAll();
// those are protected attributes from ISource (I didn't want to propagate resetParser up there)
finished = false;
got_exception = false;
@ -26,8 +25,7 @@ void IInputFormat::resetParser()
void IInputFormat::setReadBuffer(ReadBuffer & in_)
{
std::cerr << "setReadBuffer... old: " << demangle(typeid(in).name()) << ", new: " << demangle(typeid(in_).name()) << "\n";
in = in_;
in = &in_;
}
}

View File

@ -1,8 +1,7 @@
#pragma once
#include <Processors/ISource.h>
#include <memory>
#include <IO/ReadBuffer.h>
namespace DB
@ -31,8 +30,6 @@ struct ColumnMapping
using ColumnMappingPtr = std::shared_ptr<ColumnMapping>;
class ReadBuffer;
/** Input format is a source, that reads data from ReadBuffer.
*/
class IInputFormat : public ISource
@ -43,7 +40,7 @@ protected:
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wattributes"
ReadBuffer & in [[maybe_unused]];
ReadBuffer * in [[maybe_unused]];
#pragma GCC diagnostic pop

View File

@ -52,7 +52,6 @@ IRowInputFormat::IRowInputFormat(Block header, ReadBuffer & in_, Params params_)
Chunk IRowInputFormat::generate()
{
std::cerr << "in: " << demangle(typeid(in).name()) << "\n";
if (total_rows == 0)
readPrefix();

View File

@ -85,7 +85,7 @@ void ArrowBlockInputFormat::prepareReader()
if (stream)
{
auto stream_reader_status = arrow::ipc::RecordBatchStreamReader::Open(std::make_unique<ArrowInputStreamFromReadBuffer>(in));
auto stream_reader_status = arrow::ipc::RecordBatchStreamReader::Open(std::make_unique<ArrowInputStreamFromReadBuffer>(*in));
if (!stream_reader_status.ok())
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Error while opening a table: {}", stream_reader_status.status().ToString());
@ -94,7 +94,7 @@ void ArrowBlockInputFormat::prepareReader()
}
else
{
auto file_reader_status = arrow::ipc::RecordBatchFileReader::Open(asArrowFile(in));
auto file_reader_status = arrow::ipc::RecordBatchFileReader::Open(asArrowFile(*in));
if (!file_reader_status.ok())
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Error while opening a table: {}", file_reader_status.status().ToString());

View File

@ -599,7 +599,7 @@ AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_,
void AvroRowInputFormat::readPrefix()
{
file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<InputStreamReadBufferAdapter>(in));
file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<InputStreamReadBufferAdapter>(*in));
deserializer_ptr = std::make_unique<AvroDeserializer>(output.getHeader(), file_reader_ptr->dataSchema(), allow_missing_fields);
file_reader_ptr->init();
}
@ -748,7 +748,7 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, in_, params_)
, schema_registry(getConfluentSchemaRegistry(format_settings_))
, input_stream(std::make_unique<InputStreamReadBufferAdapter>(in))
, input_stream(std::make_unique<InputStreamReadBufferAdapter>(*in))
, decoder(avro::binaryDecoder())
, format_settings(format_settings_)
@ -758,16 +758,16 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(
bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
{
if (in.eof())
if (in->eof())
{
return false;
}
// skip tombstone records (kafka messages with null value)
if (in.available() == 0)
if (in->available() == 0)
{
return false;
}
SchemaId schema_id = readConfluentSchemaId(in);
SchemaId schema_id = readConfluentSchemaId(*in);
const auto & deserializer = getOrCreateDeserializer(schema_id);
deserializer.deserializeRow(columns, *decoder, ext);
decoder->drain();
@ -777,7 +777,7 @@ bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExten
void AvroConfluentRowInputFormat::syncAfterError()
{
// skip until the end of current kafka message
in.tryIgnore(in.available());
in->tryIgnore(in->available());
}
const AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(SchemaId schema_id)

View File

@ -15,12 +15,12 @@ BinaryRowInputFormat::BinaryRowInputFormat(ReadBuffer & in_, Block header, Param
bool BinaryRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (in.eof())
if (in->eof())
return false;
size_t num_columns = columns.size();
for (size_t i = 0; i < num_columns; ++i)
serializations[i]->deserializeBinary(*columns[i], in);
serializations[i]->deserializeBinary(*columns[i], *in);
return true;
}
@ -35,14 +35,14 @@ void BinaryRowInputFormat::readPrefix()
if (with_names || with_types)
{
readVarUInt(columns, in);
readVarUInt(columns, *in);
}
if (with_names)
{
for (size_t i = 0; i < columns; ++i)
{
readStringBinary(tmp, in);
readStringBinary(tmp, *in);
}
}
@ -50,7 +50,7 @@ void BinaryRowInputFormat::readPrefix()
{
for (size_t i = 0; i < columns; ++i)
{
readStringBinary(tmp, in);
readStringBinary(tmp, *in);
}
}
}

View File

@ -160,7 +160,7 @@ void CSVRowInputFormat::readPrefix()
{
/// In this format, we assume, that if first string field contain BOM as value, it will be written in quotes,
/// so BOM at beginning of stream cannot be confused with BOM in first string value, and it is safe to skip it.
skipBOMIfExists(in);
skipBOMIfExists(*in);
size_t num_columns = data_types.size();
const auto & header = getPort().getHeader();
@ -179,15 +179,15 @@ void CSVRowInputFormat::readPrefix()
do
{
String column_name;
skipWhitespacesAndTabs(in);
readCSVString(column_name, in, format_settings.csv);
skipWhitespacesAndTabs(in);
skipWhitespacesAndTabs(*in);
readCSVString(column_name, *in, format_settings.csv);
skipWhitespacesAndTabs(*in);
addInputColumn(column_name);
}
while (checkChar(format_settings.csv.delimiter, in));
while (checkChar(format_settings.csv.delimiter, *in));
skipDelimiter(in, format_settings.csv.delimiter, true);
skipDelimiter(*in, format_settings.csv.delimiter, true);
for (auto read_column : column_mapping->read_columns)
{
@ -202,7 +202,7 @@ void CSVRowInputFormat::readPrefix()
}
else
{
skipRow(in, format_settings.csv, num_columns);
skipRow(*in, format_settings.csv, num_columns);
setupAllColumnsByTableSchema();
}
}
@ -213,7 +213,7 @@ void CSVRowInputFormat::readPrefix()
bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
{
if (in.eof())
if (in->eof())
return false;
updateDiagnosticInfo();
@ -232,22 +232,22 @@ bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext
if (table_column)
{
skipWhitespacesAndTabs(in);
skipWhitespacesAndTabs(*in);
ext.read_columns[*table_column] = readField(*columns[*table_column], data_types[*table_column],
serializations[*table_column], is_last_file_column);
if (!ext.read_columns[*table_column])
have_default_columns = true;
skipWhitespacesAndTabs(in);
skipWhitespacesAndTabs(*in);
}
else
{
/// We never read this column from the file, just skip it.
String tmp;
readCSVString(tmp, in, format_settings.csv);
readCSVString(tmp, *in, format_settings.csv);
}
skipDelimiter(in, delimiter, is_last_file_column);
skipDelimiter(*in, delimiter, is_last_file_column);
}
if (have_default_columns)
@ -277,13 +277,13 @@ bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
for (size_t file_column = 0; file_column < column_mapping->column_indexes_for_input_fields.size(); ++file_column)
{
if (file_column == 0 && in.eof())
if (file_column == 0 && in->eof())
{
out << "<End of stream>\n";
return false;
}
skipWhitespacesAndTabs(in);
skipWhitespacesAndTabs(*in);
if (column_mapping->column_indexes_for_input_fields[file_column].has_value())
{
const auto & header = getPort().getHeader();
@ -300,26 +300,26 @@ bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
if (!deserializeFieldAndPrintDiagnosticInfo(skipped_column_str, skipped_column_type, *skipped_column, out, file_column))
return false;
}
skipWhitespacesAndTabs(in);
skipWhitespacesAndTabs(*in);
/// Delimiters
if (file_column + 1 == column_mapping->column_indexes_for_input_fields.size())
{
if (in.eof())
if (in->eof())
return false;
/// we support the extra delimiter at the end of the line
if (*in.position() == delimiter)
if (*in->position() == delimiter)
{
++in.position();
if (in.eof())
++in->position();
if (in->eof())
break;
}
if (!in.eof() && *in.position() != '\n' && *in.position() != '\r')
if (!in->eof() && *in->position() != '\n' && *in->position() != '\r')
{
out << "ERROR: There is no line feed. ";
verbosePrintString(in.position(), in.position() + 1, out);
verbosePrintString(in->position(), in->position() + 1, out);
out << " found instead.\n"
" It's like your file has more columns than expected.\n"
"And if your file have right number of columns, maybe it have unquoted string value with comma.\n";
@ -327,17 +327,17 @@ bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
return false;
}
skipEndOfLine(in);
skipEndOfLine(*in);
}
else
{
try
{
assertChar(delimiter, in);
assertChar(delimiter, *in);
}
catch (const DB::Exception &)
{
if (*in.position() == '\n' || *in.position() == '\r')
if (*in->position() == '\n' || *in->position() == '\r')
{
out << "ERROR: Line feed found where delimiter (" << delimiter << ") is expected."
" It's like your file has less columns than expected.\n"
@ -346,7 +346,7 @@ bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
else
{
out << "ERROR: There is no delimiter (" << delimiter << "). ";
verbosePrintString(in.position(), in.position() + 1, out);
verbosePrintString(in->position(), in->position() + 1, out);
out << " found instead.\n";
}
return false;
@ -360,7 +360,7 @@ bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
void CSVRowInputFormat::syncAfterError()
{
skipToNextLineOrEOF(in);
skipToNextLineOrEOF(*in);
}
void CSVRowInputFormat::tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column)
@ -374,15 +374,15 @@ void CSVRowInputFormat::tryDeserializeField(const DataTypePtr & type, IColumn &
else
{
String tmp;
readCSVString(tmp, in, format_settings.csv);
readCSVString(tmp, *in, format_settings.csv);
}
}
bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, const SerializationPtr & serialization, bool is_last_file_column)
{
const bool at_delimiter = !in.eof() && *in.position() == format_settings.csv.delimiter;
const bool at_delimiter = !in->eof() && *in->position() == format_settings.csv.delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (in.eof() || *in.position() == '\n' || *in.position() == '\r');
&& (in->eof() || *in->position() == '\n' || *in->position() == '\r');
/// Note: Tuples are serialized in CSV as separate columns, but with empty_as_default or null_as_default
/// only one empty or NULL column will be expected
@ -401,12 +401,12 @@ bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, co
else if (format_settings.null_as_default && !type->isNullable())
{
/// If value is null but type is not nullable then use default value instead.
return SerializationNullable::deserializeTextCSVImpl(column, in, format_settings, serialization);
return SerializationNullable::deserializeTextCSVImpl(column, *in, format_settings, serialization);
}
else
{
/// Read the column normally.
serialization->deserializeTextCSV(column, in, format_settings);
serialization->deserializeTextCSV(column, *in, format_settings);
return true;
}
}

View File

@ -206,7 +206,7 @@ CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block header,
kj::Array<capnp::word> CapnProtoRowInputFormat::readMessage()
{
uint32_t segment_count;
in.readStrict(reinterpret_cast<char*>(&segment_count), sizeof(uint32_t));
in->readStrict(reinterpret_cast<char*>(&segment_count), sizeof(uint32_t));
// one for segmentCount and one because segmentCount starts from 0
const auto prefix_size = (2 + segment_count) * sizeof(uint32_t);
@ -217,7 +217,7 @@ kj::Array<capnp::word> CapnProtoRowInputFormat::readMessage()
// read size of each segment
for (size_t i = 0; i <= segment_count; ++i)
in.readStrict(prefix_chars.begin() + ((i + 1) * sizeof(uint32_t)), sizeof(uint32_t));
in->readStrict(prefix_chars.begin() + ((i + 1) * sizeof(uint32_t)), sizeof(uint32_t));
// calculate size of message
const auto expected_words = capnp::expectedSizeInWordsFromPrefix(prefix);
@ -228,14 +228,14 @@ kj::Array<capnp::word> CapnProtoRowInputFormat::readMessage()
// read full message
::memcpy(msg_chars.begin(), prefix_chars.begin(), prefix_size);
in.readStrict(msg_chars.begin() + prefix_size, data_size);
in->readStrict(msg_chars.begin() + prefix_size, data_size);
return msg;
}
bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (in.eof())
if (in->eof())
return false;
auto array = readMessage();

View File

@ -15,7 +15,7 @@ namespace ErrorCodes
}
JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) :
IRowInputFormat(header_, in_, std::move(params_)), buf(in)
IRowInputFormat(header_, in_, std::move(params_)), buf(*in)
{
if (header_.columns() > 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,

View File

@ -49,33 +49,33 @@ void JSONCompactEachRowRowInputFormat::resetParser()
void JSONCompactEachRowRowInputFormat::readPrefix()
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(in);
skipBOMIfExists(*in);
if (with_names)
{
size_t num_columns = getPort().getHeader().columns();
read_columns.assign(num_columns, false);
assertChar('[', in);
assertChar('[', *in);
do
{
skipWhitespaceIfAny(in);
skipWhitespaceIfAny(*in);
String column_name;
readJSONString(column_name, in);
readJSONString(column_name, *in);
addInputColumn(column_name);
skipWhitespaceIfAny(in);
skipWhitespaceIfAny(*in);
}
while (checkChar(',', in));
assertChar(']', in);
while (checkChar(',', *in));
assertChar(']', *in);
skipEndOfLine();
/// Type checking
assertChar('[', in);
assertChar('[', *in);
for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
{
skipWhitespaceIfAny(in);
skipWhitespaceIfAny(*in);
String data_type;
readJSONString(data_type, in);
readJSONString(data_type, *in);
if (column_indexes_for_input_fields[i] &&
data_types[*column_indexes_for_input_fields[i]]->getName() != data_type)
@ -89,10 +89,10 @@ void JSONCompactEachRowRowInputFormat::readPrefix()
}
if (i != column_indexes_for_input_fields.size() - 1)
assertChar(',', in);
skipWhitespaceIfAny(in);
assertChar(',', *in);
skipWhitespaceIfAny(*in);
}
assertChar(']', in);
assertChar(']', *in);
}
else
{
@ -149,14 +149,14 @@ bool JSONCompactEachRowRowInputFormat::readRow(DB::MutableColumns &columns, DB::
{
skipEndOfLine();
if (in.eof())
if (in->eof())
return false;
size_t num_columns = columns.size();
read_columns.assign(num_columns, false);
assertChar('[', in);
assertChar('[', *in);
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
{
const auto & table_column = column_indexes_for_input_fields[file_column];
@ -166,19 +166,19 @@ bool JSONCompactEachRowRowInputFormat::readRow(DB::MutableColumns &columns, DB::
}
else
{
skipJSONField(in, StringRef(names_of_columns[file_column]));
skipJSONField(*in, StringRef(names_of_columns[file_column]));
}
skipWhitespaceIfAny(in);
if (in.eof())
skipWhitespaceIfAny(*in);
if (in->eof())
throw ParsingException("Unexpected end of stream while parsing JSONCompactEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
if (file_column + 1 != column_indexes_for_input_fields.size())
{
assertChar(',', in);
skipWhitespaceIfAny(in);
assertChar(',', *in);
skipWhitespaceIfAny(*in);
}
}
assertChar(']', in);
assertChar(']', *in);
for (const auto & name : not_seen_columns)
columns[name]->insertDefault();
@ -189,11 +189,11 @@ bool JSONCompactEachRowRowInputFormat::readRow(DB::MutableColumns &columns, DB::
void JSONCompactEachRowRowInputFormat::skipEndOfLine()
{
skipWhitespaceIfAny(in);
if (!in.eof() && (*in.position() == ',' || *in.position() == ';'))
++in.position();
skipWhitespaceIfAny(*in);
if (!in->eof() && (*in->position() == ',' || *in->position() == ';'))
++in->position();
skipWhitespaceIfAny(in);
skipWhitespaceIfAny(*in);
}
void JSONCompactEachRowRowInputFormat::readField(size_t index, MutableColumns & columns)
@ -207,7 +207,7 @@ void JSONCompactEachRowRowInputFormat::readField(size_t index, MutableColumns &
if (yield_strings)
{
String str;
readJSONString(str, in);
readJSONString(str, *in);
ReadBufferFromString buf(str);
@ -219,9 +219,9 @@ void JSONCompactEachRowRowInputFormat::readField(size_t index, MutableColumns &
else
{
if (format_settings.null_as_default && !type->isNullable())
read_columns[index] = SerializationNullable::deserializeTextJSONImpl(*columns[index], in, format_settings, serialization);
read_columns[index] = SerializationNullable::deserializeTextJSONImpl(*columns[index], *in, format_settings, serialization);
else
serialization->deserializeTextJSON(*columns[index], in, format_settings);
serialization->deserializeTextJSON(*columns[index], *in, format_settings);
}
}
catch (Exception & e)
@ -233,7 +233,7 @@ void JSONCompactEachRowRowInputFormat::readField(size_t index, MutableColumns &
void JSONCompactEachRowRowInputFormat::syncAfterError()
{
skipToUnescapedNextLineOrEOF(in);
skipToUnescapedNextLineOrEOF(*in);
}
void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory)

View File

@ -128,7 +128,7 @@ void JSONEachRowRowInputFormat::skipUnknownField(const StringRef & name_ref)
if (!format_settings.skip_unknown_fields)
throw Exception("Unknown field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
skipJSONField(in, name_ref);
skipJSONField(*in, name_ref);
}
void JSONEachRowRowInputFormat::readField(size_t index, MutableColumns & columns)
@ -145,7 +145,7 @@ void JSONEachRowRowInputFormat::readField(size_t index, MutableColumns & columns
if (yield_strings)
{
String str;
readJSONString(str, in);
readJSONString(str, *in);
ReadBufferFromString buf(str);
@ -157,9 +157,9 @@ void JSONEachRowRowInputFormat::readField(size_t index, MutableColumns & columns
else
{
if (format_settings.null_as_default && !type->isNullable())
read_columns[index] = SerializationNullable::deserializeTextJSONImpl(*columns[index], in, format_settings, serialization);
read_columns[index] = SerializationNullable::deserializeTextJSONImpl(*columns[index], *in, format_settings, serialization);
else
serialization->deserializeTextJSON(*columns[index], in, format_settings);
serialization->deserializeTextJSON(*columns[index], *in, format_settings);
}
}
catch (Exception & e)
@ -171,31 +171,31 @@ void JSONEachRowRowInputFormat::readField(size_t index, MutableColumns & columns
inline bool JSONEachRowRowInputFormat::advanceToNextKey(size_t key_index)
{
skipWhitespaceIfAny(in);
skipWhitespaceIfAny(*in);
if (in.eof())
if (in->eof())
throw ParsingException("Unexpected end of stream while parsing JSONEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
else if (*in.position() == '}')
else if (*in->position() == '}')
{
++in.position();
++in->position();
return false;
}
if (key_index > 0)
{
assertChar(',', in);
skipWhitespaceIfAny(in);
assertChar(',', *in);
skipWhitespaceIfAny(*in);
}
return true;
}
void JSONEachRowRowInputFormat::readJSONObject(MutableColumns & columns)
{
assertChar('{', in);
assertChar('{', *in);
for (size_t key_index = 0; advanceToNextKey(key_index); ++key_index)
{
StringRef name_ref = readColumnName(in);
StringRef name_ref = readColumnName(*in);
const size_t column_index = columnIndex(name_ref, key_index);
if (unlikely(ssize_t(column_index) < 0))
@ -207,7 +207,7 @@ void JSONEachRowRowInputFormat::readJSONObject(MutableColumns & columns)
current_column_name.assign(name_ref.data, name_ref.size);
name_ref = StringRef(current_column_name);
skipColonDelimeter(in);
skipColonDelimeter(*in);
if (column_index == UNKNOWN_FIELD)
skipUnknownField(name_ref);
@ -218,7 +218,7 @@ void JSONEachRowRowInputFormat::readJSONObject(MutableColumns & columns)
}
else
{
skipColonDelimeter(in);
skipColonDelimeter(*in);
readField(column_index, columns);
}
}
@ -238,7 +238,7 @@ bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi
{
if (!allow_new_rows)
return false;
skipWhitespaceIfAny(in);
skipWhitespaceIfAny(*in);
/// We consume , or \n before scanning a new row, instead scanning to next row at the end.
/// The reason is that if we want an exact number of rows read with LIMIT x
@ -247,25 +247,25 @@ bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi
/// Semicolon is added for convenience as it could be used at end of INSERT query.
bool is_first_row = getCurrentUnitNumber() == 0 && getTotalRows() == 1;
if (!in.eof())
if (!in->eof())
{
/// There may be optional ',' (but not before the first row)
if (!is_first_row && *in.position() == ',')
++in.position();
else if (!data_in_square_brackets && *in.position() == ';')
if (!is_first_row && *in->position() == ',')
++in->position();
else if (!data_in_square_brackets && *in->position() == ';')
{
/// ';' means the end of query (but it cannot be before ']')
return allow_new_rows = false;
}
else if (data_in_square_brackets && *in.position() == ']')
else if (data_in_square_brackets && *in->position() == ']')
{
/// ']' means the end of query
return allow_new_rows = false;
}
}
skipWhitespaceIfAny(in);
if (in.eof())
skipWhitespaceIfAny(*in);
if (in->eof())
return false;
size_t num_columns = columns.size();
@ -290,7 +290,7 @@ bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi
void JSONEachRowRowInputFormat::syncAfterError()
{
skipToUnescapedNextLineOrEOF(in);
skipToUnescapedNextLineOrEOF(*in);
}
void JSONEachRowRowInputFormat::resetParser()
@ -305,30 +305,30 @@ void JSONEachRowRowInputFormat::resetParser()
void JSONEachRowRowInputFormat::readPrefix()
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(in);
skipBOMIfExists(*in);
skipWhitespaceIfAny(in);
if (!in.eof() && *in.position() == '[')
skipWhitespaceIfAny(*in);
if (!in->eof() && *in->position() == '[')
{
++in.position();
++in->position();
data_in_square_brackets = true;
}
}
void JSONEachRowRowInputFormat::readSuffix()
{
skipWhitespaceIfAny(in);
skipWhitespaceIfAny(*in);
if (data_in_square_brackets)
{
assertChar(']', in);
skipWhitespaceIfAny(in);
assertChar(']', *in);
skipWhitespaceIfAny(*in);
}
if (!in.eof() && *in.position() == ';')
if (!in->eof() && *in->position() == ';')
{
++in.position();
skipWhitespaceIfAny(in);
++in->position();
skipWhitespaceIfAny(*in);
}
assertEOF(in);
assertEOF(*in);
}

View File

@ -29,13 +29,13 @@ void LineAsStringRowInputFormat::readLineObject(IColumn & column)
{
DB::Memory<> object;
char * pos = in.position();
char * pos = in->position();
bool need_more_data = true;
while (loadAtPosition(in, object, pos) && need_more_data)
while (loadAtPosition(*in, object, pos) && need_more_data)
{
pos = find_first_symbols<'\n'>(pos, in.buffer().end());
if (pos == in.buffer().end())
pos = find_first_symbols<'\n'>(pos, in->buffer().end());
if (pos == in->buffer().end())
continue;
if (*pos == '\n')
@ -44,8 +44,8 @@ void LineAsStringRowInputFormat::readLineObject(IColumn & column)
++pos;
}
saveUpToPosition(in, object, pos);
loadAtPosition(in, object, pos);
saveUpToPosition(*in, object, pos);
loadAtPosition(*in, object, pos);
/// Last character is always \n.
column.insertData(object.data(), object.size() - 1);
@ -53,7 +53,7 @@ void LineAsStringRowInputFormat::readLineObject(IColumn & column)
bool LineAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (in.eof())
if (in->eof())
return false;
readLineObject(*columns[0]);

View File

@ -27,7 +27,7 @@ namespace ErrorCodes
}
MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, in_, std::move(params_)), buf(in), parser(visitor), data_types(header_.getDataTypes()) {}
: IRowInputFormat(header_, in_, std::move(params_)), buf(*in), parser(visitor), data_types(header_.getDataTypes()) {}
void MsgPackRowInputFormat::resetParser()
{

View File

@ -14,7 +14,7 @@ class NativeInputFormatFromNativeBlockInputStream : public IInputFormat
public:
NativeInputFormatFromNativeBlockInputStream(const Block & header, ReadBuffer & in_)
: IInputFormat(header, in_)
, stream(std::make_shared<NativeBlockInputStream>(in, header, 0))
, stream(std::make_shared<NativeBlockInputStream>(*in, header, 0))
{
}

View File

@ -93,7 +93,7 @@ static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
void ORCBlockInputFormat::prepareReader()
{
THROW_ARROW_NOT_OK(arrow::adapters::orc::ORCFileReader::Open(asArrowFile(in), arrow::default_memory_pool(), &file_reader));
THROW_ARROW_NOT_OK(arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in), arrow::default_memory_pool(), &file_reader));
stripe_total = file_reader->NumberOfStripes();
stripe_current = 0;

View File

@ -37,7 +37,7 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr
// Segmentating the original input.
unit.segment.resize(0);
auto [have_more_data, currently_read_rows] = file_segmentation_engine(in, unit.segment, min_chunk_bytes);
auto [have_more_data, currently_read_rows] = file_segmentation_engine(*in, unit.segment, min_chunk_bytes);
unit.offset = successfully_read_rows_count;
successfully_read_rows_count += currently_read_rows;

View File

@ -92,7 +92,7 @@ static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
void ParquetBlockInputFormat::prepareReader()
{
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(in), arrow::default_memory_pool(), &file_reader));
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in), arrow::default_memory_pool(), &file_reader));
row_group_total = file_reader->num_row_groups();
row_group_current = 0;

View File

@ -29,12 +29,12 @@ RawBLOBRowInputFormat::RawBLOBRowInputFormat(const Block & header_, ReadBuffer &
bool RawBLOBRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (in.eof())
if (in->eof())
return false;
/// One excessive copy.
String blob;
readStringUntilEOF(blob, in);
readStringUntilEOF(blob, *in);
columns.at(0)->insertData(blob.data(), blob.size());
return false;
}

View File

@ -30,7 +30,7 @@ void TSKVRowInputFormat::readPrefix()
{
/// In this format, we assume that column name cannot contain BOM,
/// so BOM at beginning of stream cannot be confused with name of field, and it is safe to skip it.
skipBOMIfExists(in);
skipBOMIfExists(*in);
}
@ -95,7 +95,7 @@ static bool readName(ReadBuffer & buf, StringRef & ref, String & tmp)
bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
{
if (in.eof())
if (in->eof())
return false;
const auto & header = getPort().getHeader();
@ -105,17 +105,17 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
read_columns.assign(num_columns, false);
seen_columns.assign(num_columns, false);
if (unlikely(*in.position() == '\n'))
if (unlikely(*in->position() == '\n'))
{
/// An empty string. It is permissible, but it is unclear why.
++in.position();
++in->position();
}
else
{
while (true)
{
StringRef name_ref;
bool has_value = readName(in, name_ref, name_buf);
bool has_value = readName(*in, name_ref, name_buf);
ssize_t index = -1;
if (has_value)
@ -131,7 +131,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
/// If the key is not found, skip the value.
NullOutput sink;
readEscapedStringInto(sink, in);
readEscapedStringInto(sink, *in);
}
else
{
@ -144,9 +144,9 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
const auto & type = getPort().getHeader().getByPosition(index).type;
const auto & serialization = serializations[index];
if (format_settings.null_as_default && !type->isNullable())
read_columns[index] = SerializationNullable::deserializeTextEscapedImpl(*columns[index], in, format_settings, serialization);
read_columns[index] = SerializationNullable::deserializeTextEscapedImpl(*columns[index], *in, format_settings, serialization);
else
serialization->deserializeTextEscaped(*columns[index], in, format_settings);
serialization->deserializeTextEscaped(*columns[index], *in, format_settings);
}
}
else
@ -156,18 +156,18 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
throw Exception("Found field without value while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
}
if (in.eof())
if (in->eof())
{
throw ParsingException("Unexpected end of stream after field in TSKV format: " + name_ref.toString(), ErrorCodes::CANNOT_READ_ALL_DATA);
}
else if (*in.position() == '\t')
else if (*in->position() == '\t')
{
++in.position();
++in->position();
continue;
}
else if (*in.position() == '\n')
else if (*in->position() == '\n')
{
++in.position();
++in->position();
break;
}
else
@ -198,7 +198,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
void TSKVRowInputFormat::syncAfterError()
{
skipToUnescapedNextLineOrEOF(in);
skipToUnescapedNextLineOrEOF(*in);
}

View File

@ -35,15 +35,15 @@ public:
{
String tmp;
while (!in.eof())
while (!in->eof())
{
char * pos = find_first_symbols<'\n', '\t'>(in.position(), in.buffer().end());
char * pos = find_first_symbols<'\n', '\t'>(in->position(), in->buffer().end());
tmp.append(in.position(), pos - in.position());
in.position() = pos;
tmp.append(in->position(), pos - in->position());
in->position() = pos;
if (pos == in.buffer().end())
in.next();
if (pos == in->buffer().end())
in->next();
else
break;
}

View File

@ -134,7 +134,7 @@ void TabSeparatedRowInputFormat::readPrefix()
/// In this format, we assume that column name or type cannot contain BOM,
/// so, if format has header,
/// then BOM at beginning of stream cannot be confused with name or type of field, and it is safe to skip it.
skipBOMIfExists(in);
skipBOMIfExists(*in);
}
/// This is a bit of abstraction leakage, but we have almost the same code in other places.
@ -146,12 +146,12 @@ void TabSeparatedRowInputFormat::readPrefix()
String column_name;
for (;;)
{
readEscapedString(column_name, in);
if (!checkChar('\t', in))
readEscapedString(column_name, *in);
if (!checkChar('\t', *in))
{
/// Check last column for \r before adding it, otherwise an error will be:
/// "Unknown field found in TSV header"
checkForCarriageReturn(in);
checkForCarriageReturn(*in);
addInputColumn(column_name);
break;
}
@ -160,15 +160,15 @@ void TabSeparatedRowInputFormat::readPrefix()
}
if (!in.eof())
if (!in->eof())
{
assertChar('\n', in);
assertChar('\n', *in);
}
}
else
{
setupAllColumnsByTableSchema();
skipTSVRow(in, column_mapping->column_indexes_for_input_fields.size());
skipTSVRow(*in, column_mapping->column_indexes_for_input_fields.size());
}
}
else if (!column_mapping->is_set)
@ -176,14 +176,14 @@ void TabSeparatedRowInputFormat::readPrefix()
if (with_types)
{
skipTSVRow(in, column_mapping->column_indexes_for_input_fields.size());
skipTSVRow(*in, column_mapping->column_indexes_for_input_fields.size());
}
}
bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
{
if (in.eof())
if (in->eof())
return false;
updateDiagnosticInfo();
@ -201,20 +201,20 @@ bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtens
else
{
NullOutput null_sink;
readEscapedStringInto(null_sink, in);
readEscapedStringInto(null_sink, *in);
}
/// skip separators
if (file_column + 1 < column_mapping->column_indexes_for_input_fields.size())
{
assertChar('\t', in);
assertChar('\t', *in);
}
else if (!in.eof())
else if (!in->eof())
{
if (unlikely(row_num == 1))
checkForCarriageReturn(in);
checkForCarriageReturn(*in);
assertChar('\n', in);
assertChar('\n', *in);
}
}
@ -227,8 +227,8 @@ bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtens
bool TabSeparatedRowInputFormat::readField(IColumn & column, const DataTypePtr & type,
const SerializationPtr & serialization, bool is_last_file_column)
{
const bool at_delimiter = !is_last_file_column && !in.eof() && *in.position() == '\t';
const bool at_last_column_line_end = is_last_file_column && (in.eof() || *in.position() == '\n');
const bool at_delimiter = !is_last_file_column && !in->eof() && *in->position() == '\t';
const bool at_last_column_line_end = is_last_file_column && (in->eof() || *in->position() == '\n');
if (format_settings.tsv.empty_as_default && (at_delimiter || at_last_column_line_end))
{
@ -236,9 +236,9 @@ bool TabSeparatedRowInputFormat::readField(IColumn & column, const DataTypePtr &
return false;
}
else if (format_settings.null_as_default && !type->isNullable())
return SerializationNullable::deserializeTextEscapedImpl(column, in, format_settings, serialization);
return SerializationNullable::deserializeTextEscapedImpl(column, *in, format_settings, serialization);
serialization->deserializeTextEscaped(column, in, format_settings);
serialization->deserializeTextEscaped(column, *in, format_settings);
return true;
}
@ -246,7 +246,7 @@ bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns &
{
for (size_t file_column = 0; file_column < column_mapping->column_indexes_for_input_fields.size(); ++file_column)
{
if (file_column == 0 && in.eof())
if (file_column == 0 && in->eof())
{
out << "<End of stream>\n";
return false;
@ -272,21 +272,21 @@ bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns &
/// Delimiters
if (file_column + 1 == column_mapping->column_indexes_for_input_fields.size())
{
if (!in.eof())
if (!in->eof())
{
try
{
assertChar('\n', in);
assertChar('\n', *in);
}
catch (const DB::Exception &)
{
if (*in.position() == '\t')
if (*in->position() == '\t')
{
out << "ERROR: Tab found where line feed is expected."
" It's like your file has more columns than expected.\n"
"And if your file have right number of columns, maybe it have unescaped tab in value.\n";
}
else if (*in.position() == '\r')
else if (*in->position() == '\r')
{
out << "ERROR: Carriage return found where line feed is expected."
" It's like your file has DOS/Windows style line separators, that is illegal in TabSeparated format.\n";
@ -294,7 +294,7 @@ bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns &
else
{
out << "ERROR: There is no line feed. ";
verbosePrintString(in.position(), in.position() + 1, out);
verbosePrintString(in->position(), in->position() + 1, out);
out << " found instead.\n";
}
return false;
@ -305,25 +305,25 @@ bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns &
{
try
{
assertChar('\t', in);
assertChar('\t', *in);
}
catch (const DB::Exception &)
{
if (*in.position() == '\n')
if (*in->position() == '\n')
{
out << "ERROR: Line feed found where tab is expected."
" It's like your file has less columns than expected.\n"
"And if your file have right number of columns, "
"maybe it have unescaped backslash in value before tab, which cause tab has escaped.\n";
}
else if (*in.position() == '\r')
else if (*in->position() == '\r')
{
out << "ERROR: Carriage return found where tab is expected.\n";
}
else
{
out << "ERROR: There is no tab. ";
verbosePrintString(in.position(), in.position() + 1, out);
verbosePrintString(in->position(), in->position() + 1, out);
out << " found instead.\n";
}
return false;
@ -342,19 +342,19 @@ void TabSeparatedRowInputFormat::tryDeserializeField(const DataTypePtr & type, I
bool can_be_parsed_as_null = removeLowCardinality(type)->isNullable();
// check null value for type is not nullable. don't cross buffer bound for simplicity, so maybe missing some case
if (!can_be_parsed_as_null && !in.eof())
if (!can_be_parsed_as_null && !in->eof())
{
if (*in.position() == '\\' && in.available() >= 2)
if (*in->position() == '\\' && in->available() >= 2)
{
++in.position();
if (*in.position() == 'N')
++in->position();
if (*in->position() == 'N')
{
++in.position();
++in->position();
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected NULL value of not Nullable type {}", type->getName());
}
else
{
--in.position();
--in->position();
}
}
}
@ -365,13 +365,13 @@ void TabSeparatedRowInputFormat::tryDeserializeField(const DataTypePtr & type, I
else
{
NullOutput null_sink;
readEscapedStringInto(null_sink, in);
readEscapedStringInto(null_sink, *in);
}
}
void TabSeparatedRowInputFormat::syncAfterError()
{
skipToUnescapedNextLineOrEOF(in);
skipToUnescapedNextLineOrEOF(*in);
}
void TabSeparatedRowInputFormat::resetParser()

View File

@ -29,15 +29,15 @@ void RowInputFormatWithDiagnosticInfo::updateDiagnosticInfo()
++row_num;
bytes_read_at_start_of_buffer_on_prev_row = bytes_read_at_start_of_buffer_on_current_row;
bytes_read_at_start_of_buffer_on_current_row = in.count() - in.offset();
bytes_read_at_start_of_buffer_on_current_row = in->count() - in->offset();
offset_of_prev_row = offset_of_current_row;
offset_of_current_row = in.offset();
offset_of_current_row = in->offset();
}
String RowInputFormatWithDiagnosticInfo::getDiagnosticInfo()
{
if (in.eof())
if (in->eof())
return "Buffer has gone, cannot extract information about what has been parsed.";
WriteBufferFromOwnString out;
@ -46,7 +46,7 @@ String RowInputFormatWithDiagnosticInfo::getDiagnosticInfo()
MutableColumns columns = header.cloneEmptyColumns();
/// It is possible to display detailed diagnostics only if the last and next to last rows are still in the read buffer.
size_t bytes_read_at_start_of_buffer = in.count() - in.offset();
size_t bytes_read_at_start_of_buffer = in->count() - in->offset();
if (bytes_read_at_start_of_buffer != bytes_read_at_start_of_buffer_on_prev_row)
{
out << "Could not print diagnostic info because two last rows aren't in buffer (rare case)\n";
@ -65,9 +65,9 @@ String RowInputFormatWithDiagnosticInfo::getDiagnosticInfo()
/// Roll back the cursor to the beginning of the previous or current row and parse all over again. But now we derive detailed information.
if (offset_of_prev_row <= in.buffer().size())
if (offset_of_prev_row <= in->buffer().size())
{
in.position() = in.buffer().begin() + offset_of_prev_row;
in->position() = in->buffer().begin() + offset_of_prev_row;
out << "\nRow " << (row_num - 1) << ":\n";
if (!parseRowAndPrintDiagnosticInfo(columns, out))
@ -75,13 +75,13 @@ String RowInputFormatWithDiagnosticInfo::getDiagnosticInfo()
}
else
{
if (in.buffer().size() < offset_of_current_row)
if (in->buffer().size() < offset_of_current_row)
{
out << "Could not print diagnostic info because parsing of data hasn't started.\n";
return out.str();
}
in.position() = in.buffer().begin() + offset_of_current_row;
in->position() = in->buffer().begin() + offset_of_current_row;
}
out << "\nRow " << row_num << ":\n";
@ -101,7 +101,7 @@ bool RowInputFormatWithDiagnosticInfo::deserializeFieldAndPrintDiagnosticInfo(co
<< "name: " << alignedName(col_name, max_length_of_column_name)
<< "type: " << alignedName(type->getName(), max_length_of_data_type_name);
auto * prev_position = in.position();
auto * prev_position = in->position();
std::exception_ptr exception;
try
@ -112,7 +112,7 @@ bool RowInputFormatWithDiagnosticInfo::deserializeFieldAndPrintDiagnosticInfo(co
{
exception = std::current_exception();
}
auto * curr_position = in.position();
auto * curr_position = in->position();
if (curr_position < prev_position)
throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR);
@ -123,7 +123,7 @@ bool RowInputFormatWithDiagnosticInfo::deserializeFieldAndPrintDiagnosticInfo(co
if (curr_position == prev_position)
{
out << "ERROR: text ";
verbosePrintString(prev_position, std::min(prev_position + 10, in.buffer().end()), out);
verbosePrintString(prev_position, std::min(prev_position + 10, in->buffer().end()), out);
out << " is not like " << type->getName() << "\n";
return false;
}
@ -152,7 +152,7 @@ bool RowInputFormatWithDiagnosticInfo::deserializeFieldAndPrintDiagnosticInfo(co
if (isGarbageAfterField(file_column, curr_position))
{
out << "ERROR: garbage after " << type->getName() << ": ";
verbosePrintString(curr_position, std::min(curr_position + 10, in.buffer().end()), out);
verbosePrintString(curr_position, std::min(curr_position + 10, in->buffer().end()), out);
out << "\n";
if (type->getName() == "DateTime")

View File

@ -5,6 +5,7 @@
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/EmptyReadBuffer.h>
#include <DataStreams/BlockIO.h>
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
@ -26,8 +27,7 @@ namespace ErrorCodes
extern const int UNKNOWN_TYPE_OF_QUERY;
}
std::pair<InputFormatPtr, Pipe> getSourceFromASTInsertQuery(
InputFormatPtr getInputFormatFromASTInsertQuery(
const ASTPtr & ast,
bool with_buffers,
const Block & header,
@ -57,12 +57,24 @@ std::pair<InputFormatPtr, Pipe> getSourceFromASTInsertQuery(
std::unique_ptr<ReadBuffer> input_buffer = with_buffers
? getReadBufferFromASTInsertQuery(ast)
: std::make_unique<ConcatReadBuffer>();
: std::make_unique<EmptyReadBuffer>();
auto source = FormatFactory::instance().getInput(format, *input_buffer, header, context, context->getSettings().max_insert_block_size);
source->addBuffer(std::move(input_buffer));
return source;
}
Pipe getSourceFromASTInsertQuery(
const ASTPtr & ast,
bool with_buffers,
const Block & header,
ContextPtr context,
const ASTPtr & input_function)
{
auto source = getInputFormatFromASTInsertQuery(ast, with_buffers, header, context, input_function);
Pipe pipe(source);
const auto * ast_insert_query = ast->as<ASTInsertQuery>();
if (context->getSettingsRef().input_format_defaults_for_omitted_fields && ast_insert_query->table_id && !input_function)
{
StoragePtr storage = DatabaseCatalog::instance().getTable(ast_insert_query->table_id, context);
@ -77,7 +89,7 @@ std::pair<InputFormatPtr, Pipe> getSourceFromASTInsertQuery(
}
}
return {std::move(source), std::move(pipe)};
return pipe;
}
std::unique_ptr<ReadBuffer> getReadBufferFromASTInsertQuery(const ASTPtr & ast)

View File

@ -17,7 +17,14 @@ namespace DB
class Pipe;
std::pair<InputFormatPtr, Pipe> getSourceFromASTInsertQuery(
InputFormatPtr getInputFormatFromASTInsertQuery(
const ASTPtr & ast,
bool with_buffers,
const Block & header,
ContextPtr context,
const ASTPtr & input_function);
Pipe getSourceFromASTInsertQuery(
const ASTPtr & ast,
bool with_buffers,
const Block & header,

View File

@ -5,6 +5,7 @@
#include <Formats/FormatFactory.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <common/logger_useful.h>
#include <Interpreters/Context.h>
@ -78,7 +79,6 @@ Block KafkaBlockInputStream::readImpl()
// now it's one-time usage InputStream
// one block of the needed size (or with desired flush timeout) is formed in one internal iteration
// otherwise external iteration will reuse that and logic will became even more fuzzy
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto put_error_to_stream = handle_error_mode == HandleKafkaErrorMode::STREAM;
@ -86,92 +86,45 @@ Block KafkaBlockInputStream::readImpl()
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);
port.setNeeded();
std::optional<std::string> exception_message;
auto read_kafka_message = [&]
size_t total_rows = 0;
size_t new_rows = 0;
size_t failed_poll_attempts = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
{
size_t new_rows = 0;
while (true)
if (put_error_to_stream)
{
auto status = input_format->prepare();
switch (status)
exception_message = e.message();
for (const auto & column : result_columns)
{
case IProcessor::Status::Ready:
input_format->work();
break;
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
case IProcessor::Status::Finished:
input_format->resetParser();
return new_rows;
case IProcessor::Status::PortFull:
{
auto chunk = port.pull();
// that was returning bad value before https://github.com/ClickHouse/ClickHouse/pull/8005
// if will be backported should go together with #8005
auto chunk_rows = chunk.getNumRows();
new_rows += chunk_rows;
auto columns = chunk.detachColumns();
for (size_t i = 0, s = columns.size(); i < s; ++i)
{
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
}
break;
}
case IProcessor::Status::NeedData:
case IProcessor::Status::Async:
case IProcessor::Status::ExpandPipeline:
throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR);
// all data columns will get default value in case of error
column->insertDefault();
}
return 1;
}
else
{
e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'",
buffer->currentTopic(), buffer->currentPartition(), buffer->currentOffset());
throw;
}
};
size_t total_rows = 0;
size_t failed_poll_attempts = 0;
StreamingFormatExecutor executor(non_virtual_header, input_format, std::move(on_error));
while (true)
{
size_t new_rows = 0;
exception_message.reset();
if (buffer->poll())
{
try
{
new_rows = read_kafka_message();
}
catch (Exception & e)
{
if (put_error_to_stream)
{
input_format->resetParser();
exception_message = e.message();
for (auto & column : result_columns)
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
{
column->popBack(cur_rows - total_rows);
}
// all data columns will get default value in case of error
column->insertDefault();
}
new_rows = 1;
}
else
{
e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'", buffer->currentTopic(), buffer->currentPartition(), buffer->currentOffset());
throw;
}
}
}
new_rows = executor.execute();
if (new_rows)
{
@ -276,7 +229,7 @@ Block KafkaBlockInputStream::readImpl()
// i.e. will not be stored anythere
// If needed any extra columns can be added using DEFAULT they can be added at MV level if needed.
auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
auto result_block = non_virtual_header.cloneWithColumns(executor.getResultColumns());
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
for (const auto & column : virtual_block.getColumnsWithTypeAndName())

View File

@ -3,6 +3,7 @@
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
namespace ErrorCodes
@ -87,56 +88,11 @@ Block RabbitMQBlockInputStream::readImpl()
finished = true;
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);
port.setNeeded();
auto read_rabbitmq_message = [&]
{
size_t new_rows = 0;
while (true)
{
auto status = input_format->prepare();
switch (status)
{
case IProcessor::Status::Ready:
input_format->work();
break;
case IProcessor::Status::Finished:
input_format->resetParser();
return new_rows;
case IProcessor::Status::PortFull:
{
auto chunk = port.pull();
auto chunk_rows = chunk.getNumRows();
new_rows += chunk_rows;
auto columns = chunk.detachColumns();
for (size_t i = 0, s = columns.size(); i < s; ++i)
{
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
}
break;
}
case IProcessor::Status::NeedData:
case IProcessor::Status::Async:
case IProcessor::Status::ExpandPipeline:
throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR);
}
}
};
StreamingFormatExecutor executor(non_virtual_header, input_format);
size_t total_rows = 0;
@ -145,7 +101,7 @@ Block RabbitMQBlockInputStream::readImpl()
if (buffer->eof())
break;
auto new_rows = read_rabbitmq_message();
auto new_rows = executor.execute();
if (new_rows)
{
@ -180,7 +136,7 @@ Block RabbitMQBlockInputStream::readImpl()
if (total_rows == 0)
return Block();
auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
auto result_block = non_virtual_header.cloneWithColumns(executor.getResultColumns());
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
for (const auto & column : virtual_block.getColumnsWithTypeAndName())