Try fix test

This commit is contained in:
Nikolai Kochetov 2019-11-08 20:30:56 +03:00
parent 9fa8133a1f
commit f909575196
2 changed files with 14 additions and 19 deletions

View File

@ -11,6 +11,7 @@
#include <TableFunctions/TableFunctionFactory.h>
#include <common/logger_useful.h>
#include <DataStreams/ConvertingBlockInputStream.h>
namespace ProfileEvents
@ -66,7 +67,7 @@ SelectStreamFactory::SelectStreamFactory(
namespace
{
BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Context & context, QueryProcessingStage::Enum processed_stage)
BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage)
{
checkStackSize();
@ -83,7 +84,7 @@ BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Context &
*/
/// return std::make_shared<MaterializingBlockInputStream>(stream);
return stream;
return std::make_shared<ConvertingBlockInputStream>(context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
}
static String formattedAST(const ASTPtr & ast)
@ -109,7 +110,7 @@ void SelectStreamFactory::createForShard(
auto emplace_local_stream = [&]()
{
res.emplace_back(createLocalStream(modified_query_ast, context, processed_stage));
res.emplace_back(createLocalStream(modified_query_ast, header, context, processed_stage));
};
String modified_query = formattedAST(modified_query_ast);
@ -249,7 +250,7 @@ void SelectStreamFactory::createForShard(
}
if (try_results.empty() || local_delay < max_remote_delay)
return createLocalStream(modified_query_ast, context, stage);
return createLocalStream(modified_query_ast, header, context, stage);
else
{
std::vector<IConnectionPool::Entry> connections;

View File

@ -47,7 +47,6 @@
#include <memory>
#include <filesystem>
#include "VirtualColumnUtils.h"
namespace DB
@ -320,28 +319,23 @@ BlockInputStreams StorageDistributed::read(
const Settings & settings = context.getSettingsRef();
bool has_shard_num_column = std::find(column_names.begin(), column_names.end(), "_shard_num") != column_names.end();
auto query_for_header = query_info.query;
if (has_shard_num_column)
{
query_for_header = query_for_header->clone();
VirtualColumnUtils::rewriteEntityInAst(query_for_header, "_shard_num", 0, "toUInt32");
}
const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table, remote_table_function_ptr);
Block header =
InterpreterSelectQuery(query_for_header, context, SelectQueryOptions(processed_stage)).getSampleBlock();
auto modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table, remote_table_function_ptr);
InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage)).getSampleBlock();
const Scalars & scalars = context.hasQueryContext() ? context.getQueryContext().getScalars() : Scalars{};
bool has_virtual_shard_num_column = std::find(column_names.begin(), column_names.end(), "_shard_num") != column_names.end();
if (has_virtual_shard_num_column && !isVirtualColumn("_shard_num"))
has_virtual_shard_num_column = false;
ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr
? ClusterProxy::SelectStreamFactory(
header, processed_stage, remote_table_function_ptr, scalars, has_shard_num_column, context.getExternalTables())
header, processed_stage, remote_table_function_ptr, scalars, has_virtual_shard_num_column, context.getExternalTables())
: ClusterProxy::SelectStreamFactory(
header, processed_stage, QualifiedTableName{remote_database, remote_table}, scalars, has_shard_num_column, context.getExternalTables());
header, processed_stage, QualifiedTableName{remote_database, remote_table}, scalars, has_virtual_shard_num_column, context.getExternalTables());
if (settings.optimize_skip_unused_shards)
{