Make better

This commit is contained in:
avogar 2022-04-19 19:16:47 +00:00
parent b6c0fd69d6
commit 1f252cedfe
7 changed files with 55 additions and 100 deletions

View File

@ -52,6 +52,7 @@ ColumnsDescription readSchemaFromFormat(
const String & format_name, const String & format_name,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ReadBufferIterator & read_buffer_iterator, ReadBufferIterator & read_buffer_iterator,
bool retry,
ContextPtr & context, ContextPtr & context,
std::unique_ptr<ReadBuffer> & buf_out) std::unique_ptr<ReadBuffer> & buf_out)
{ {
@ -59,20 +60,27 @@ ColumnsDescription readSchemaFromFormat(
if (FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format_name)) if (FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format_name))
{ {
auto external_schema_reader = FormatFactory::instance().getExternalSchemaReader(format_name, context, format_settings); auto external_schema_reader = FormatFactory::instance().getExternalSchemaReader(format_name, context, format_settings);
names_and_types = external_schema_reader->readSchema(); try
{
names_and_types = external_schema_reader->readSchema();
}
catch (const DB::Exception & e)
{
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, e.message());
}
} }
else if (FormatFactory::instance().checkIfFormatHasSchemaReader(format_name)) else if (FormatFactory::instance().checkIfFormatHasSchemaReader(format_name))
{ {
std::string exception_messages; std::string exception_messages;
SchemaReaderPtr schema_reader; SchemaReaderPtr schema_reader;
std::unique_ptr<ReadBuffer> buf; std::unique_ptr<ReadBuffer> buf;
while ((buf = read_buffer_iterator.next())) while ((buf = read_buffer_iterator()))
{ {
if (buf->eof()) if (buf->eof())
{ {
auto exception_message = fmt::format("Cannot extract table structure from {} format file, file is empty\n", format_name); auto exception_message = fmt::format("Cannot extract table structure from {} format file, file is empty\n", format_name);
if (read_buffer_iterator.isSingle()) if (!retry)
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, exception_message); throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, exception_message);
exception_messages += "\n" + exception_message; exception_messages += "\n" + exception_message;
@ -90,7 +98,7 @@ ColumnsDescription readSchemaFromFormat(
{ {
auto exception_message = getCurrentExceptionMessage(false); auto exception_message = getCurrentExceptionMessage(false);
if (read_buffer_iterator.isSingle() || !isRetryableSchemaInferenceError(getCurrentExceptionCode())) if (!retry || !isRetryableSchemaInferenceError(getCurrentExceptionCode()))
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, exception_message); throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, exception_message);
exception_messages += "\n" + exception_message; exception_messages += "\n" + exception_message;
@ -122,10 +130,10 @@ ColumnsDescription readSchemaFromFormat(
return ColumnsDescription(names_and_types); return ColumnsDescription(names_and_types);
} }
ColumnsDescription readSchemaFromFormat(const String & format_name, const std::optional<FormatSettings> & format_settings, ReadBufferIterator & read_buffer_iterator, ContextPtr & context) ColumnsDescription readSchemaFromFormat(const String & format_name, const std::optional<FormatSettings> & format_settings, ReadBufferIterator & read_buffer_iterator, bool retry, ContextPtr & context)
{ {
std::unique_ptr<ReadBuffer> buf_out; std::unique_ptr<ReadBuffer> buf_out;
return readSchemaFromFormat(format_name, format_settings, read_buffer_iterator, context, buf_out); return readSchemaFromFormat(format_name, format_settings, read_buffer_iterator, retry, context, buf_out);
} }
DataTypePtr makeNullableRecursivelyAndCheckForNothing(DataTypePtr type) DataTypePtr makeNullableRecursivelyAndCheckForNothing(DataTypePtr type)

View File

@ -6,64 +6,7 @@
namespace DB namespace DB
{ {
class ReadBufferIterator using ReadBufferIterator = std::function<std::unique_ptr<ReadBuffer>()>;
{
public:
virtual std::unique_ptr<ReadBuffer> next() = 0;
virtual bool isSingle() const = 0;
virtual ~ReadBufferIterator() = default;
};
template <class ListIterator, class BufferCreator>
class ReadBufferListIterator : public ReadBufferIterator
{
public:
ReadBufferListIterator(ListIterator begin_, ListIterator end_, BufferCreator buffer_creator_)
: it(begin_), end(end_), buffer_creator(buffer_creator_), is_single(std::distance(begin_, end_) == 1)
{
}
std::unique_ptr<ReadBuffer> next() override
{
if (it == end)
return nullptr;
auto res = buffer_creator(it);
++it;
return res;
}
bool isSingle() const override { return is_single; }
private:
ListIterator it;
ListIterator end;
BufferCreator buffer_creator;
bool is_single;
};
template <class BufferCreator>
class ReadBufferSingleIterator : public ReadBufferIterator
{
public:
ReadBufferSingleIterator(BufferCreator buffer_creator_) : buffer_creator(buffer_creator_)
{
}
std::unique_ptr<ReadBuffer> next() override
{
if (done)
return nullptr;
done = true;
return buffer_creator();
}
bool isSingle() const override { return true; }
private:
BufferCreator buffer_creator;
bool done = false;
};
/// Try to determine the schema of the data in specifying format. /// Try to determine the schema of the data in specifying format.
/// For formats that have an external schema reader, it will /// For formats that have an external schema reader, it will
@ -79,6 +22,7 @@ ColumnsDescription readSchemaFromFormat(
const String & format_name, const String & format_name,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ReadBufferIterator & read_buffer_iterator, ReadBufferIterator & read_buffer_iterator,
bool retry,
ContextPtr & context); ContextPtr & context);
/// If ReadBuffer is created, it will be written to buf_out. /// If ReadBuffer is created, it will be written to buf_out.
@ -86,6 +30,7 @@ ColumnsDescription readSchemaFromFormat(
const String & format_name, const String & format_name,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ReadBufferIterator & read_buffer_iterator, ReadBufferIterator & read_buffer_iterator,
bool retry,
ContextPtr & context, ContextPtr & context,
std::unique_ptr<ReadBuffer> & buf_out); std::unique_ptr<ReadBuffer> & buf_out);

View File

@ -189,14 +189,15 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
"specify table structure manually", "specify table structure manually",
format); format);
auto read_buffer_creator = [&, uri_without_path = uri_without_path](std::vector<String>::const_iterator & it) -> std::unique_ptr<ReadBuffer> ReadBufferIterator read_buffer_iterator = [&, uri_without_path = uri_without_path, it = paths.begin()]() mutable -> std::unique_ptr<ReadBuffer>
{ {
if (it == paths.end())
return nullptr;
auto compression = chooseCompressionMethod(*it, compression_method); auto compression = chooseCompressionMethod(*it, compression_method);
return wrapReadBufferWithCompressionMethod( return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromHDFS>(uri_without_path, *it, ctx->getGlobalContext()->getConfigRef()), compression); std::make_unique<ReadBufferFromHDFS>(uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef()), compression);
}; };
ReadBufferListIterator read_buffer_iterator(paths.cbegin(), paths.cend(), read_buffer_creator); return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx);
return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, ctx);
} }
class HDFSSource::DisclosedGlobIterator::Impl class HDFSSource::DisclosedGlobIterator::Impl

View File

@ -237,7 +237,7 @@ ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr c
/// in case of file descriptor we have a stream of data and we cannot /// in case of file descriptor we have a stream of data and we cannot
/// start reading data from the beginning after reading some data for /// start reading data from the beginning after reading some data for
/// schema inference. /// schema inference.
auto read_buffer_creator = [&]() ReadBufferIterator read_buffer_iterator = [&]()
{ {
/// We will use PeekableReadBuffer to create a checkpoint, so we need a place /// We will use PeekableReadBuffer to create a checkpoint, so we need a place
/// where we can store the original read buffer. /// where we can store the original read buffer.
@ -247,8 +247,7 @@ ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr c
return read_buf; return read_buf;
}; };
ReadBufferSingleIterator read_buffer_iterator(read_buffer_creator); auto columns = readSchemaFromFormat(format_name, format_settings, read_buffer_iterator, false, context, peekable_read_buffer_from_fd);
auto columns = readSchemaFromFormat(format_name, format_settings, read_buffer_iterator, context, peekable_read_buffer_from_fd);
if (peekable_read_buffer_from_fd) if (peekable_read_buffer_from_fd)
{ {
/// If we have created read buffer in readSchemaFromFormat we should rollback to checkpoint. /// If we have created read buffer in readSchemaFromFormat we should rollback to checkpoint.
@ -282,13 +281,15 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
"table structure manually", "table structure manually",
format); format);
auto read_buffer_creator = [&](std::vector<String>::const_iterator & it) ReadBufferIterator read_buffer_iterator = [&, it = paths.begin()]() mutable -> std::unique_ptr<ReadBuffer>
{ {
return createReadBuffer(*it, false, "File", -1, compression_method, context); if (it == paths.end())
return nullptr;
return createReadBuffer(*it++, false, "File", -1, compression_method, context);
}; };
ReadBufferListIterator read_buffer_iterator(paths.begin(), paths.end(), read_buffer_creator); return readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context);
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, context);
} }
bool StorageFile::isColumnOriented() const bool StorageFile::isColumnOriented() const

View File

@ -962,30 +962,29 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
ContextPtr ctx) ContextPtr ctx)
{ {
auto file_iterator = createFileIterator(client_auth, {client_auth.uri.key}, is_key_with_globs, distributed_processing, ctx); auto file_iterator = createFileIterator(client_auth, {client_auth.uri.key}, is_key_with_globs, distributed_processing, ctx);
std::vector<String> keys;
String key = (*file_iterator)();
while (!key.empty())
{
keys.push_back(key);
key = (*file_iterator)();
}
if (keys.empty() && !FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format)) ReadBufferIterator read_buffer_iterator = [&, first = false]() mutable -> std::unique_ptr<ReadBuffer>
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files with provided path in S3. You must specify "
"table structure manually",
format);
auto read_buffer_creator = [&](std::vector<String>::iterator & it)
{ {
auto key = (*file_iterator)();
if (key.empty())
{
if (first)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files with provided path in S3. You must specify "
"table structure manually",
format);
return nullptr;
}
first = false;
return wrapReadBufferWithCompressionMethod( return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>( std::make_unique<ReadBufferFromS3>(
client_auth.client, client_auth.uri.bucket, *it, max_single_read_retries, ctx->getReadSettings()), client_auth.client, client_auth.uri.bucket, key, max_single_read_retries, ctx->getReadSettings()),
chooseCompressionMethod(*it, compression_method)); chooseCompressionMethod(key, compression_method));
}; };
ReadBufferListIterator read_buffer_iterator(keys.begin(), keys.end(), read_buffer_creator);
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, ctx); return readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx);
} }

View File

@ -552,8 +552,11 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
} }
auto read_buffer_creator = [&](std::vector<String>::const_iterator & it) ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin()]() mutable -> std::unique_ptr<ReadBuffer>
{ {
if (it == urls_to_check.cend())
return nullptr;
return StorageURLSource::getFirstAvailableURLReadBuffer( return StorageURLSource::getFirstAvailableURLReadBuffer(
it, it,
urls_to_check.cend(), urls_to_check.cend(),
@ -570,8 +573,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
context->getSettingsRef().max_download_threads); context->getSettingsRef().max_download_threads);
}; };
ReadBufferListIterator read_buffer_iterator(urls_to_check.cbegin(), urls_to_check.cend(), read_buffer_creator); return readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context);
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, context);
} }
bool IStorageURLBase::isColumnOriented() const bool IStorageURLBase::isColumnOriented() const

View File

@ -49,12 +49,11 @@ void TableFunctionFormat::parseArguments(const ASTPtr & ast_function, ContextPtr
ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context) const
{ {
auto read_buffer_creator = [&]() ReadBufferIterator read_buffer_iterator = [&]()
{ {
return std::make_unique<ReadBufferFromString>(data); return std::make_unique<ReadBufferFromString>(data);
}; };
ReadBufferSingleIterator read_buffer_iterator(read_buffer_creator); return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, false, context);
return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, context);
} }
Block TableFunctionFormat::parseData(ColumnsDescription columns, ContextPtr context) const Block TableFunctionFormat::parseData(ColumnsDescription columns, ContextPtr context) const