Detect format and schema for stdin in clickhouse-local

This commit is contained in:
avogar 2022-01-24 21:41:44 +03:00
parent b9fb741c79
commit a6740d2f9a
9 changed files with 124 additions and 16 deletions

View File

@ -13,6 +13,8 @@
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Poco/URI.h>
#include <Common/Exception.h>
#include <fcntl.h>
#include <unistd.h>
#include <boost/algorithm/string/case_conv.hpp>
@ -410,6 +412,9 @@ void FormatFactory::registerFileExtension(const String & extension, const String
String FormatFactory::getFormatFromFileName(String file_name, bool throw_if_not_found)
{
if (file_name == "stdin")
return getFormatFromFileDescriptor(STDIN_FILENO);
CompressionMethod compression_method = chooseCompressionMethod(file_name, "");
if (CompressionMethod::None != compression_method)
{
@ -438,6 +443,25 @@ String FormatFactory::getFormatFromFileName(String file_name, bool throw_if_not_
return it->second;
}
String FormatFactory::getFormatFromFileDescriptor(int fd)
{
#ifdef OS_LINUX
char buf[32] = {'\0'};
snprintf(buf, sizeof(buf), "/proc/self/fd/%d", fd);
char file_path[PATH_MAX] = {'\0'};
if (readlink(buf, file_path, sizeof(file_path) - 1) != -1)
return getFormatFromFileName(file_path, false);
return "";
#elif defined(__APPLE__)
char file_path[PATH_MAX] = {'\0'};
if (fcntl(fd, F_GETPATH, file_path) != -1)
return getFormatFromFileName(file_path, false);
return "";
#else
return "";
#endif
}
void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine)
{
auto & target = dict[name].file_segmentation_engine;

View File

@ -174,6 +174,7 @@ public:
/// Register file extension for format
void registerFileExtension(const String & extension, const String & format_name);
String getFormatFromFileName(String file_name, bool throw_if_not_found = false);
String getFormatFromFileDescriptor(int fd);
/// Register schema readers for format its name.
void registerSchemaReader(const String & name, SchemaReaderCreator schema_reader_creator);

View File

@ -17,7 +17,12 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
ColumnsDescription readSchemaFromFormat(const String & format_name, const std::optional<FormatSettings> & format_settings, ReadBufferCreator read_buffer_creator, ContextPtr context)
ColumnsDescription readSchemaFromFormat(
const String & format_name,
const std::optional<FormatSettings> & format_settings,
ReadBufferCreator read_buffer_creator,
ContextPtr context,
std::unique_ptr<ReadBuffer> & buf_out)
{
NamesAndTypesList names_and_types;
if (FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format_name))
@ -34,11 +39,11 @@ ColumnsDescription readSchemaFromFormat(const String & format_name, const std::o
}
else if (FormatFactory::instance().checkIfFormatHasSchemaReader(format_name))
{
auto read_buf = read_buffer_creator();
if (read_buf->eof())
buf_out = read_buffer_creator();
if (buf_out->eof())
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file, file is empty", format_name);
auto schema_reader = FormatFactory::instance().getSchemaReader(format_name, *read_buf, context, format_settings);
auto schema_reader = FormatFactory::instance().getSchemaReader(format_name, *buf_out, context, format_settings);
try
{
names_and_types = schema_reader->readSchema();
@ -54,6 +59,12 @@ ColumnsDescription readSchemaFromFormat(const String & format_name, const std::o
return ColumnsDescription(names_and_types);
}
ColumnsDescription readSchemaFromFormat(const String & format_name, const std::optional<FormatSettings> & format_settings, ReadBufferCreator read_buffer_creator, ContextPtr context)
{
std::unique_ptr<ReadBuffer> buf_out;
return readSchemaFromFormat(format_name, format_settings, read_buffer_creator, context, buf_out);
}
DataTypePtr generalizeDataType(DataTypePtr type)
{
WhichDataType which(type);

View File

@ -15,7 +15,19 @@ namespace DB
/// If format doesn't have any schema reader or a schema reader
/// couldn't determine the schema, an exception will be thrown.
using ReadBufferCreator = std::function<std::unique_ptr<ReadBuffer>()>;
ColumnsDescription readSchemaFromFormat(const String & format_name, const std::optional<FormatSettings> & format_settings, ReadBufferCreator read_buffer_creator, ContextPtr context);
ColumnsDescription readSchemaFromFormat(
const String & format_name,
const std::optional<FormatSettings> & format_settings,
ReadBufferCreator read_buffer_creator,
ContextPtr context);
/// If ReadBuffer is created, it will be written to buf_out.
ColumnsDescription readSchemaFromFormat(
const String & format_name,
const std::optional<FormatSettings> & format_settings,
ReadBufferCreator read_buffer_creator,
ContextPtr context,
std::unique_ptr<ReadBuffer> & buf_out);
/// Convert type to the most general type:
/// - IntN, UIntN, FloatN, Decimal -> Float64

View File

@ -217,8 +217,33 @@ Strings StorageFile::getPathsList(const String & table_path, const String & user
return paths;
}
ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr context)
{
/// If we want to read schema from file descriptor we should create
/// a read buffer from fd, create a checkpoint, read some data required
/// for schema inference, rollback to checkpoint and then use the created
/// peekable read buffer on the first read from storage. It's needed because
/// in case of file descriptor we have a stream of data and we cannot
/// start reading data from the beginning after reading some data for
/// schema inference.
auto read_buffer_creator = [&]()
{
/// We will use PeekableReadBuffer to create a checkpoint, so we need a place
/// where we can store the original read buffer.
read_buffer_from_fd = createReadBuffer("", true, getName(), table_fd, compression_method, context);
auto read_buf = std::make_unique<PeekableReadBuffer>(*read_buffer_from_fd);
read_buf->setCheckpoint();
return read_buf;
};
ColumnsDescription StorageFile::getTableStructureFromData(
auto columns = readSchemaFromFormat(format_name, format_settings, read_buffer_creator, context, peekable_read_buffer_from_fd);
if (peekable_read_buffer_from_fd)
/// If we have created read buffer in readSchemaFromFormat we should rollback to checkpoint.
assert_cast<PeekableReadBuffer *>(peekable_read_buffer_from_fd.get())->rollbackToCheckpoint();
return columns;
}
ColumnsDescription StorageFile::getTableStructureFromFile(
const String & format,
const std::vector<String> & paths,
const String & compression_method,
@ -271,8 +296,6 @@ StorageFile::StorageFile(int table_fd_, CommonArguments args)
throw Exception("Using file descriptor as source of storage isn't allowed for server daemons", ErrorCodes::DATABASE_ACCESS_DENIED);
if (args.format_name == "Distributed")
throw Exception("Distributed format is allowed only with explicit file path", ErrorCodes::INCORRECT_FILE_NAME);
if (args.columns.empty())
throw Exception("Automatic schema inference is not allowed when using file descriptor as source of storage", ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE);
is_db_table = false;
use_table_fd = true;
@ -321,9 +344,15 @@ void StorageFile::setStorageMetadata(CommonArguments args)
if (args.format_name == "Distributed" || args.columns.empty())
{
auto columns = getTableStructureFromData(format_name, paths, compression_method, format_settings, args.getContext());
if (!args.columns.empty() && args.columns != columns)
throw Exception("Table structure and file structure are different", ErrorCodes::INCOMPATIBLE_COLUMNS);
ColumnsDescription columns;
if (use_table_fd)
columns = getTableStructureFromFileDescriptor(args.getContext());
else
{
columns = getTableStructureFromFile(format_name, paths, compression_method, format_settings, args.getContext());
if (!args.columns.empty() && args.columns != columns)
throw Exception("Table structure and file structure are different", ErrorCodes::INCOMPATIBLE_COLUMNS);
}
storage_metadata.setColumns(columns);
}
else
@ -395,11 +424,13 @@ public:
ContextPtr context_,
UInt64 max_block_size_,
FilesInfoPtr files_info_,
ColumnsDescription columns_description_)
ColumnsDescription columns_description_,
std::unique_ptr<ReadBuffer> read_buf_)
: SourceWithProgress(getBlockForSource(storage_, metadata_snapshot_, columns_description_, files_info_))
, storage(std::move(storage_))
, metadata_snapshot(metadata_snapshot_)
, files_info(std::move(files_info_))
, read_buf(std::move(read_buf_))
, columns_description(std::move(columns_description_))
, context(context_)
, max_block_size(max_block_size_)
@ -441,7 +472,8 @@ public:
}
}
read_buf = createReadBuffer(current_path, storage->use_table_fd, storage->getName(), storage->table_fd, storage->compression_method, context);
if (!read_buf)
read_buf = createReadBuffer(current_path, storage->use_table_fd, storage->getName(), storage->table_fd, storage->compression_method, context);
auto get_block_for_format = [&]() -> Block
{
@ -587,7 +619,7 @@ Pipe StorageFile::read(
};
pipes.emplace_back(std::make_shared<StorageFileSource>(
this_ptr, metadata_snapshot, context, max_block_size, files_info, get_columns_for_format()));
this_ptr, metadata_snapshot, context, max_block_size, files_info, get_columns_for_format(), std::move(peekable_read_buffer_from_fd)));
}
return Pipe::unitePipes(std::move(pipes));

View File

@ -71,7 +71,9 @@ public:
bool supportsPartitionBy() const override { return true; }
static ColumnsDescription getTableStructureFromData(
ColumnsDescription getTableStructureFromFileDescriptor(ContextPtr context);
static ColumnsDescription getTableStructureFromFile(
const String & format,
const std::vector<String> & paths,
const String & compression_method,
@ -120,6 +122,11 @@ private:
size_t total_bytes_to_read = 0;
String path_for_partitioned_write;
/// These buffers are needed for schema inference when data source
/// is file descriptor. See getTableStructureFromFileDescriptor.
std::unique_ptr<ReadBuffer> read_buffer_from_fd;
std::unique_ptr<ReadBuffer> peekable_read_buffer_from_fd;
};
}

View File

@ -38,7 +38,7 @@ ColumnsDescription TableFunctionFile::getActualTableStructure(ContextPtr context
{
size_t total_bytes_to_read = 0;
Strings paths = StorageFile::getPathsList(filename, context->getUserFilesPath(), context, total_bytes_to_read);
return StorageFile::getTableStructureFromData(format, paths, compression_method, std::nullopt, context);
return StorageFile::getTableStructureFromFile(format, paths, compression_method, std::nullopt, context);
}
return parseColumnsListFromString(structure, context);

View File

@ -0,0 +1,10 @@
0
1
2
3
4
5
6
7
8
9

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "select * from numbers(10) format Parquet" > $CLICKHOUSE_TMP/data.parquet
$CLICKHOUSE_LOCAL -q "select * from table" < $CLICKHOUSE_TMP/data.parquet