fix query header

This commit is contained in:
Alexander Tokmakov 2020-09-01 13:37:42 +03:00
parent f0a5f19dae
commit b6093d9a86
9 changed files with 43 additions and 28 deletions

View File

@ -178,7 +178,7 @@ Pipe StorageMerge::read(
modified_context->setSetting("optimize_move_to_prewhere", false);
/// What will be result structure depending on query processed stage in source tables?
Block header = getQueryHeader(column_names, metadata_snapshot, query_info, context, processed_stage);
Block header = getQueryHeader(*this, column_names, metadata_snapshot, query_info, context, processed_stage);
/** First we make list of selected tables to find out its size.
* This is necessary to correctly pass the recommended number of threads to each table.
@ -431,6 +431,7 @@ void StorageMerge::alter(
}
Block StorageMerge::getQueryHeader(
const IStorage & storage,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
@ -441,7 +442,7 @@ Block StorageMerge::getQueryHeader(
{
case QueryProcessingStage::FetchColumns:
{
Block header = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
Block header = metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID());
if (query_info.prewhere_info)
{
query_info.prewhere_info->prewhere_actions->execute(header);
@ -457,7 +458,7 @@ Block StorageMerge::getQueryHeader(
removeJoin(*query->as<ASTSelectQuery>());
auto stream = std::make_shared<OneBlockInputStream>(
metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()));
metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID()));
return InterpreterSelectQuery(query, context, stream, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
}

View File

@ -47,6 +47,14 @@ public:
bool mayBenefitFromIndexForIn(
const ASTPtr & left_in_operand, const Context & query_context, const StorageMetadataPtr & metadata_snapshot) const override;
static Block getQueryHeader(
const IStorage & storage,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage);
private:
String source_database;
OptimizedRegularExpression table_name_regexp;
@ -75,13 +83,6 @@ protected:
const String & table_name_regexp_,
const Context & context_);
Block getQueryHeader(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage);
Pipe createSources(
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,

View File

@ -6,6 +6,9 @@
#include <Common/CurrentThread.h>
#include <Processors/Transforms/ConvertingTransform.h>
//#include <common/logger_useful.h>
#include <Storages/StorageMerge.h>
namespace DB
{
@ -25,14 +28,17 @@ public:
StorageInMemoryMetadata cached_metadata;
cached_metadata.setColumns(std::move(cached_columns));
setInMemoryMetadata(cached_metadata);
//log = &Poco::Logger::get("TABLE_FUNCTION_PROXY");
}
StoragePtr getNested() const override
{
//LOG_WARNING(log, "getNested()");
std::lock_guard lock{nested_mutex};
if (nested)
return nested;
//LOG_WARNING(log, "getNested() creating");
auto nested_storage = get_nested();
nested_storage->startup();
nested = nested_storage;
@ -71,6 +77,10 @@ public:
size_t max_block_size,
unsigned num_streams) override
{
String cnames;
for (const auto & c : column_names)
cnames += c + " ";
//LOG_WARNING(log, "read() {} cols: {}", QueryProcessingStage::toString(processed_stage), cnames);
auto storage = getNested();
auto nested_metadata = storage->getInMemoryMetadataPtr();
auto pipe = storage->read(column_names, nested_metadata, query_info, context, processed_stage, max_block_size, num_streams);
@ -78,9 +88,11 @@ public:
{
pipe.addSimpleTransform([&](const Block & header)
{
auto to = StorageMerge::getQueryHeader(*this, column_names, metadata_snapshot, query_info, context, processed_stage);
//LOG_WARNING(log, "try convert \n{}\n to \n{}\n", header.dumpStructure(), to.dumpStructure());
return std::make_shared<ConvertingTransform>(
header,
metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()),
to,
ConvertingTransform::MatchColumnsMode::Name);
});
}
@ -114,6 +126,7 @@ private:
mutable std::mutex nested_mutex;
mutable GetNestedStorageFunc get_nested;
mutable StoragePtr nested;
//mutable Poco::Logger * log;
};
}

View File

@ -9,10 +9,10 @@
namespace DB
{
StoragePtr TableFunctionFile::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const std::string & compression_method) const
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const std::string & compression_method_) const
{
StorageFile::CommonArguments args{StorageID(getDatabaseName(), table_name), format, compression_method, columns, ConstraintsDescription{}, global_context};
StorageFile::CommonArguments args{StorageID(getDatabaseName(), table_name), format_, compression_method_, columns, ConstraintsDescription{}, global_context};
return StorageFile::create(source, global_context.getUserFilesPath(), args);
}

View File

@ -23,7 +23,7 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const std::string & compression_method) const override;
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const std::string & compression_method_) const override;
const char * getStorageTypeName() const override { return "File"; }
};}

View File

@ -10,17 +10,17 @@
namespace DB
{
StoragePtr TableFunctionHDFS::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method) const
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method_) const
{
return StorageHDFS::create(
source,
StorageID(getDatabaseName(), table_name),
format,
format_,
columns,
ConstraintsDescription{},
global_context,
compression_method);
compression_method_);
}

View File

@ -26,8 +26,8 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method) const override;
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method_) const override;
const char * getStorageTypeName() const override { return "HDFS"; }
};

View File

@ -10,12 +10,12 @@
namespace DB
{
StoragePtr TableFunctionURL::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method) const
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method_) const
{
Poco::URI uri(source);
return StorageURL::create( uri, StorageID(getDatabaseName(), table_name), format, columns, ConstraintsDescription{},
global_context, compression_method);
return StorageURL::create( uri, StorageID(getDatabaseName(), table_name), format_, columns, ConstraintsDescription{},
global_context, compression_method_);
}
void registerTableFunctionURL(TableFunctionFactory & factory)

View File

@ -21,8 +21,8 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method) const override;
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method_) const override;
const char * getStorageTypeName() const override { return "URL"; }
};