Fixed COLUMN_NOT_FOUND in block issue

This commit is contained in:
Smita Kulkarni 2023-06-05 23:15:13 +02:00
parent dedb9067ce
commit aa6f4e43c5
5 changed files with 34 additions and 17 deletions

View File

@ -476,7 +476,7 @@ private:
Pipe StorageAzure::read(
const Names & /*column_names*/ ,
const Names & column_names ,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr context,
@ -491,13 +491,11 @@ Pipe StorageAzure::read(
objects.emplace_back(key);
auto reader = object_storage->readObjects(objects);
auto block_for_format = storage_snapshot->metadata->getSampleBlock();
for (auto col : block_for_format.getColumns())
LOG_INFO(&Poco::Logger::get("StorageAzure"), "read col = {}",col->getName());
auto columns_description = storage_snapshot->getDescriptionForColumns(column_names);
auto block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
pipes.emplace_back(std::make_shared<StorageAzureSource>(std::move(reader), context, block_for_format, max_block_size));
pipes.emplace_back(std::make_shared<StorageAzureSource>(std::move(reader), context, block_for_format, max_block_size, columns_description));
return Pipe::unitePipes(std::move(pipes));
@ -583,12 +581,13 @@ bool StorageAzure::supportsPartitionBy() const
StorageAzureSource::StorageAzureSource (std::unique_ptr<ReadBufferFromFileBase> && read_buffer_, ContextPtr context_,
const Block & sample_block_,UInt64 max_block_size_)
:ISource(Block())
const Block & sample_block_,UInt64 max_block_size_, const ColumnsDescription & columns_)
:ISource(sample_block_)
, WithContext(context_)
, read_buffer(std::move(read_buffer_))
, sample_block(sample_block_)
, max_block_size(max_block_size_)
, columns_desc(columns_)
{
auto format = "TSV";
@ -598,6 +597,13 @@ StorageAzureSource::StorageAzureSource (std::unique_ptr<ReadBufferFromFileBase>
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
if (columns_desc.hasDefaults())
{
builder.addSimpleTransform(
[&](const Block & header)
{ return std::make_shared<AddingDefaultsTransform>(header, columns_desc, *input_format, getContext()); });
}
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
}
@ -605,13 +611,16 @@ StorageAzureSource::StorageAzureSource (std::unique_ptr<ReadBufferFromFileBase>
Chunk StorageAzureSource::generate()
{
Chunk chunk;
if (reader->pull(chunk))
while(true)
{
LOG_INFO(&Poco::Logger::get("StorageAzureSource"), "pulled chunk rows = {}", chunk.getNumRows());
Chunk chunk;
if (reader->pull(chunk))
{
LOG_INFO(&Poco::Logger::get("StorageAzureSource"), "pulled chunk rows = {}", chunk.getNumRows());
}
return chunk;
}
return chunk;
// return {};
}
String StorageAzureSource::getName() const

View File

@ -128,7 +128,7 @@ private:
class StorageAzureSource : public ISource, WithContext
{
public:
StorageAzureSource (std::unique_ptr<ReadBufferFromFileBase> && read_buffer_, ContextPtr context_, const Block & sample_block_, UInt64 max_block_size_);
StorageAzureSource (std::unique_ptr<ReadBufferFromFileBase> && read_buffer_, ContextPtr context_, const Block & sample_block_, UInt64 max_block_size_, const ColumnsDescription & columns_);
~StorageAzureSource() override {}
Chunk generate() override;
@ -145,6 +145,7 @@ private:
std::unique_ptr<PullingPipelineExecutor> reader;
Block sample_block;
UInt64 max_block_size;
ColumnsDescription columns_desc;
// void createReader();
};

View File

@ -957,6 +957,9 @@ StorageS3::StorageS3(
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
LOG_INFO(&Poco::Logger::get("StorageS3"), "constructor columns = {}", columns.toString());
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});

View File

@ -54,7 +54,8 @@ void TableFunctionAzure::parseArgumentsImpl(ASTs & args, const ContextPtr & cont
void TableFunctionAzure::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
LOG_INFO(&Poco::Logger::get("TableFunctionAzure"), "parseArguments = {}", ast_function->dumpTree());
/// Clone ast function, because we can modify its arguments like removing headers.
auto ast_copy = ast_function->clone();
ASTs & args_func = ast_function->children;
@ -78,8 +79,6 @@ bool TableFunctionAzure::supportsReadingSubsetOfColumns()
StoragePtr TableFunctionAzure::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
LOG_INFO(&Poco::Logger::get("TableFunctionAzure"), "executeImpl = {}", table_name);
ColumnsDescription columns;
columns = parseColumnsListFromString(configuration.structure, context);

View File

@ -294,6 +294,8 @@ void TableFunctionS3::addColumnsStructureToArguments(ASTs & args, const String &
ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context) const
{
LOG_INFO(&Poco::Logger::get("TableFunctionS3"), "getActualTableStructure configuration.structure = {} ",configuration.structure);
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
@ -319,6 +321,9 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
else if (!structure_hint.empty())
columns = structure_hint;
LOG_INFO(&Poco::Logger::get("TableFunctionS3"), "executeImpl structre = {} structure_hint = {} ",configuration.structure, structure_hint.getAll().toString());
StoragePtr storage = std::make_shared<StorageS3>(
configuration,
context,