This commit is contained in:
Nikolay Degterinsky 2022-09-01 13:23:34 +00:00
parent 5a611e272a
commit e9232fc4e6

View File

@ -39,6 +39,7 @@
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <memory>
#include <cmath>
#include <unistd.h>
@ -1293,49 +1294,43 @@ try
/// Create the header block
SharedContextHolder shared_context = Context::createShared();
auto context = Context::createGlobal(shared_context.get());
auto context_const = WithContext(context).getContext();
context->makeGlobalContext();
Block header;
ColumnsDescription schema_columns;
if (structure.empty())
{
ReadBufferIterator read_buffer_iterator = [&]()
ReadBufferIterator read_buffer_iterator = [&](ColumnsDescription &)
{
return std::make_unique<ReadBufferFromFileDescriptor>(STDIN_FILENO);
auto file = std::make_unique<ReadBufferFromFileDescriptor>(STDIN_FILENO);
/// stdin must be seekable
auto res = lseek(file->getFD(), 0, SEEK_SET);
if (-1 == res)
throwFromErrno("Input must be seekable file (it will be read twice).", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
return file;
};
auto context_const = WithContext(context).getContext();
auto schema_columns = readSchemaFromFormat(input_format, {}, read_buffer_iterator, false, context_const);
auto schema_columns_info = schema_columns.getOrdinary();
for (auto & info : schema_columns_info)
{
ColumnWithTypeAndName column;
column.name = info.name;
column.type = info.type;
column.column = column.type->createColumn();
header.insert(std::move(column));
}
schema_columns = readSchemaFromFormat(input_format, {}, read_buffer_iterator, false, context_const);
}
else
{
std::vector<std::string> structure_vals;
boost::split(structure_vals, structure, boost::algorithm::is_any_of(" ,"), boost::algorithm::token_compress_on);
schema_columns = parseColumnsListFromString(structure, context_const);
}
if (structure_vals.size() % 2 != 0)
throw Exception("Odd number of elements in section structure: must be a list of name type pairs", ErrorCodes::LOGICAL_ERROR);
auto schema_columns_info = schema_columns.getOrdinary();
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
for (size_t i = 0, size = structure_vals.size(); i < size; i += 2)
{
ColumnWithTypeAndName column;
column.name = structure_vals[i];
column.type = data_type_factory.get(structure_vals[i + 1]);
column.column = column.type->createColumn();
header.insert(std::move(column));
}
for (auto & info : schema_columns_info)
{
ColumnWithTypeAndName column;
column.name = info.name;
column.type = info.type;
column.column = column.type->createColumn();
header.insert(std::move(column));
}
ReadBufferFromFileDescriptor file_in(STDIN_FILENO);