Fix schema inference in s3Cluster and improve in hdfsCluster

This commit is contained in:
avogar 2022-09-30 16:59:17 +00:00
parent cb530fa54c
commit c74b5c8126
11 changed files with 104 additions and 79 deletions

View File

@ -3,6 +3,7 @@
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTFunction.h>
#include <Storages/IStorage.h>
namespace DB
@ -155,4 +156,20 @@ TablesWithColumns getDatabaseAndTablesWithColumns(
return tables_with_columns;
}
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query)
{
auto * select_query = query->as<ASTSelectQuery>();
if (!select_query || !select_query->tables())
return nullptr;
auto * tables = select_query->tables()->as<ASTTablesInSelectQuery>();
auto * table_expression = tables->children[0]->as<ASTTablesInSelectQueryElement>()->table_expression->as<ASTTableExpression>();
if (!table_expression->table_function)
return nullptr;
auto * table_function = table_expression->table_function->as<ASTFunction>();
return table_function->arguments->as<ASTExpressionList>();
}
}

View File

@ -3,6 +3,7 @@
#include <Core/NamesAndTypes.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Parsers/ASTExpressionList.h>
namespace DB
{
@ -23,4 +24,6 @@ ASTPtr extractTableExpression(const ASTSelectQuery & select, size_t table_number
TablesWithColumns getDatabaseAndTablesWithColumns(
const ASTTableExprConstPtrs & table_expressions, ContextPtr context, bool include_alias_cols, bool include_materialized_cols);
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query);
}

View File

@ -25,6 +25,7 @@
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/StorageDictionary.h>
#include <memory>
@ -32,6 +33,34 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static ASTPtr addColumnsStructureToQuery(const ASTPtr & query, const String & structure)
{
/// Add argument with table structure to hdfsCluster table function in select query.
auto result_query = query->clone();
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(result_query);
if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function hdfsCluster, got '{}'", queryToString(query));
auto structure_literal = std::make_shared<ASTLiteral>(structure);
if (expression_list->children.size() != 2 && expression_list->children.size() != 3)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 2 or 3 arguments in hdfsCluster table functions, got {}", expression_list->children.size());
if (expression_list->children.size() == 2)
{
auto format_literal = std::make_shared<ASTLiteral>("auto");
expression_list->children.push_back(format_literal);
}
expression_list->children.push_back(structure_literal);
return result_query;
}
StorageHDFSCluster::StorageHDFSCluster(
ContextPtr context_,
String cluster_name_,
@ -56,6 +85,7 @@ StorageHDFSCluster::StorageHDFSCluster(
{
auto columns = StorageHDFS::getTableStructureFromData(format_name, uri_, compression_method, context_);
storage_metadata.setColumns(columns);
need_to_add_structure_to_query = true;
}
else
storage_metadata.setColumns(columns_);
@ -92,6 +122,11 @@ Pipe StorageHDFSCluster::read(
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
auto query_to_send = query_info.original_query;
if (need_to_add_structure_to_query)
query_to_send = addColumnsStructureToQuery(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()));
for (const auto & replicas : cluster->getShardsAddresses())
{
/// There will be only one replica, because we consider each replica as a shard
@ -110,7 +145,7 @@ Pipe StorageHDFSCluster::read(
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
queryToString(query_info.original_query),
queryToString(query_to_send),
header,
context,
/*throttler=*/nullptr,

View File

@ -44,6 +44,7 @@ private:
String uri;
String format_name;
String compression_method;
bool need_to_add_structure_to_query = false;
};

View File

@ -361,39 +361,6 @@ String StorageS3Source::KeysIterator::next()
return pimpl->next();
}
class StorageS3Source::ReadTasksIterator::Impl
{
public:
explicit Impl(const std::vector<String> & read_tasks_, const ReadTaskCallback & new_read_tasks_callback_)
: read_tasks(read_tasks_), new_read_tasks_callback(new_read_tasks_callback_)
{
}
String next()
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= read_tasks.size())
return new_read_tasks_callback();
return read_tasks[current_index];
}
private:
std::atomic_size_t index = 0;
std::vector<String> read_tasks;
ReadTaskCallback new_read_tasks_callback;
};
StorageS3Source::ReadTasksIterator::ReadTasksIterator(
const std::vector<String> & read_tasks_, const ReadTaskCallback & new_read_tasks_callback_)
: pimpl(std::make_shared<StorageS3Source::ReadTasksIterator::Impl>(read_tasks_, new_read_tasks_callback_))
{
}
String StorageS3Source::ReadTasksIterator::next()
{
return pimpl->next();
}
Block StorageS3Source::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
{
for (const auto & virtual_column : requested_virtual_columns)
@ -802,8 +769,7 @@ StorageS3::StorageS3(
distributed_processing_,
is_key_with_globs,
format_settings,
context_,
&read_tasks_used_in_schema_inference);
context_);
storage_metadata.setColumns(columns);
}
else
@ -831,19 +797,14 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
ContextPtr local_context,
ASTPtr query,
const Block & virtual_block,
const std::vector<String> & read_tasks,
std::unordered_map<String, S3::ObjectInfo> * object_infos,
Strings * read_keys)
{
if (distributed_processing)
{
return std::make_shared<StorageS3Source::IteratorWrapper>(
[read_tasks_iterator = std::make_shared<StorageS3Source::ReadTasksIterator>(read_tasks, local_context->getReadTaskCallback()), read_keys]() -> String
{
auto key = read_tasks_iterator->next();
if (read_keys)
read_keys->push_back(key);
return key;
[callback = local_context->getReadTaskCallback()]() -> String {
return callback();
});
}
else if (is_key_with_globs)
@ -903,7 +864,6 @@ Pipe StorageS3::read(
local_context,
query_info.query,
virtual_block,
read_tasks_used_in_schema_inference,
&object_infos);
ColumnsDescription columns_description;
@ -1201,7 +1161,7 @@ ColumnsDescription StorageS3::getTableStructureFromData(
return getTableStructureFromDataImpl(
configuration.format, s3_configuration, configuration.compression_method, distributed_processing,
s3_configuration.uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos);
s3_configuration.uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, object_infos);
}
ColumnsDescription StorageS3::getTableStructureFromDataImpl(
@ -1212,13 +1172,12 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx,
std::vector<String> * read_keys_in_distributed_processing,
std::unordered_map<String, S3::ObjectInfo> * object_infos)
{
std::vector<String> read_keys;
auto file_iterator
= createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx, nullptr, {}, {}, object_infos, &read_keys);
= createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx, nullptr, {}, object_infos, &read_keys);
std::optional<ColumnsDescription> columns_from_cache;
size_t prev_read_keys_size = read_keys.size();
@ -1271,9 +1230,6 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
if (ctx->getSettingsRef().schema_inference_use_cache_for_s3)
addColumnsToCache(read_keys, s3_configuration, columns, format, format_settings, ctx);
if (distributed_processing && read_keys_in_distributed_processing)
*read_keys_in_distributed_processing = std::move(read_keys);
return columns;
}

View File

@ -66,18 +66,6 @@ public:
std::shared_ptr<Impl> pimpl;
};
class ReadTasksIterator
{
public:
ReadTasksIterator(const std::vector<String> & read_tasks_, const ReadTaskCallback & new_read_tasks_callback_);
String next();
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
using IteratorWrapper = std::function<String()>;
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
@ -238,8 +226,6 @@ private:
ASTPtr partition_by;
bool is_key_with_globs = false;
std::vector<String> read_tasks_used_in_schema_inference;
std::unordered_map<String, S3::ObjectInfo> object_infos;
static void updateS3Configuration(ContextPtr, S3Configuration &);
@ -252,7 +238,6 @@ private:
ContextPtr local_context,
ASTPtr query,
const Block & virtual_block,
const std::vector<String> & read_tasks = {},
std::unordered_map<String, S3::ObjectInfo> * object_infos = nullptr,
Strings * read_keys = nullptr);
@ -264,7 +249,6 @@ private:
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx,
std::vector<String> * read_keys_in_distributed_processing = nullptr,
std::unordered_map<String, S3::ObjectInfo> * object_infos = nullptr);
bool supportsSubsetOfColumns() const override;

View File

@ -5,14 +5,9 @@
#if USE_AWS_S3
#include "Common/Exception.h"
#include <Common/Throttler.h>
#include "Client/Connection.h"
#include "Core/QueryProcessingStage.h"
#include <Core/UUID.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
@ -23,28 +18,54 @@
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/Pipe.h>
#include "Processors/ISource.h"
#include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/getVirtualsForStorage.h>
#include <Storages/StorageDictionary.h>
#include <Common/logger_useful.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <ios>
#include <memory>
#include <string>
#include <thread>
#include <cassert>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static ASTPtr addColumnsStructureToQuery(const ASTPtr & query, const String & structure)
{
/// Add argument with table structure to s3Cluster table function in select query.
auto result_query = query->clone();
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(result_query);
if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function from s3Cluster, got '{}'", queryToString(query));
auto structure_literal = std::make_shared<ASTLiteral>(structure);
if (expression_list->children.size() < 2 || expression_list->children.size() > 5)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 2 to 5 arguments in s3Cluster table functions, got {}", expression_list->children.size());
if (expression_list->children.size() == 2 || expression_list->children.size() == 4)
{
auto format_literal = std::make_shared<ASTLiteral>("auto");
expression_list->children.push_back(format_literal);
}
expression_list->children.push_back(structure_literal);
return result_query;
}
StorageS3Cluster::StorageS3Cluster(
const StorageS3ClusterConfiguration & configuration_,
const StorageID & table_id_,
@ -72,6 +93,7 @@ StorageS3Cluster::StorageS3Cluster(
auto columns = StorageS3::getTableStructureFromDataImpl(format_name, s3_configuration, compression_method,
/*distributed_processing_*/false, is_key_with_globs, /*format_settings=*/std::nullopt, context_);
storage_metadata.setColumns(columns);
need_to_add_structure_to_query = true;
}
else
storage_metadata.setColumns(columns_);
@ -117,6 +139,11 @@ Pipe StorageS3Cluster::read(
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
ASTPtr query_to_send = query_info.original_query;
if (need_to_add_structure_to_query)
query_to_send = addColumnsStructureToQuery(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()));
for (const auto & replicas : cluster->getShardsAddresses())
{
/// There will be only one replica, because we consider each replica as a shard
@ -135,7 +162,7 @@ Pipe StorageS3Cluster::read(
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
queryToString(query_info.original_query),
queryToString(query_to_send),
header,
context,
/*throttler=*/nullptr,

View File

@ -46,6 +46,7 @@ private:
String compression_method;
NamesAndTypesList virtual_columns;
Block virtual_block;
bool need_to_add_structure_to_query = false;
};

View File

@ -48,7 +48,7 @@ void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, Conte
const auto message = fmt::format(
"The signature of table function {} shall be the following:\n" \
" - cluster, uri\n",\
" - cluster, format\n",\
" - cluster, uri, format\n",\
" - cluster, uri, format, structure\n",\
" - cluster, uri, format, structure, compression_method",
getName());

View File

@ -64,7 +64,7 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
if (args.size() == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id");
if (FormatFactory::instance().getAllFormats().contains(second_arg))
if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
args_to_idx = {{"format", 1}, {"structure", 2}, {"compression_method", 3}};
else
@ -77,7 +77,7 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id");
if (FormatFactory::instance().getAllFormats().contains(second_arg))
if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
args_to_idx = {{"format", 1}, {"structure", 2}};
else
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}};

View File

@ -77,6 +77,7 @@ void TableFunctionS3Cluster::parseArguments(const ASTPtr & ast_function, Context
/// StorageS3ClusterConfiguration inherints from StorageS3Configuration, so it is safe to upcast it.
TableFunctionS3::parseArgumentsImpl(message, clipped_args, context, static_cast<StorageS3Configuration & >(configuration));
LOG_DEBUG(&Poco::Logger::get("TableFunctionS3Cluster"), "Structure: {}", configuration.structure);
}