try use another wrapper for table functions

This commit is contained in:
Alexander Tokmakov 2020-08-31 20:45:21 +03:00
parent b521ca9b18
commit f0a5f19dae
24 changed files with 308 additions and 198 deletions

158
src/Storages/StorageProxy.h Normal file
View File

@ -0,0 +1,158 @@
#pragma once
#include <Storages/IStorage.h>
namespace DB
{
class StorageProxy : public IStorage
{
public:
StorageProxy(const StorageID & table_id_) : IStorage(table_id_) {}
virtual StoragePtr getNested() const = 0;
String getName() const override { return "StorageProxy"; }
bool isRemote() const override { return getNested()->isRemote(); }
bool isView() const override { return getNested()->isView(); }
bool supportsSampling() const override { return getNested()->supportsSampling(); }
bool supportsFinal() const override { return getNested()->supportsFinal(); }
bool supportsPrewhere() const override { return getNested()->supportsPrewhere(); }
bool supportsReplication() const override { return getNested()->supportsReplication(); }
bool supportsParallelInsert() const override { return getNested()->supportsParallelInsert(); }
bool supportsDeduplication() const override { return getNested()->supportsDeduplication(); }
bool supportsSettings() const override { return getNested()->supportsSettings(); }
bool noPushingToViews() const override { return getNested()->noPushingToViews(); }
bool hasEvenlyDistributedRead() const override { return getNested()->hasEvenlyDistributedRead(); }
ColumnSizeByName getColumnSizes() const override { return getNested()->getColumnSizes(); }
NamesAndTypesList getVirtuals() const override { return getNested()->getVirtuals(); }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const ASTPtr & ast) const override
{
return getNested()->getQueryProcessingStage(context, to_stage, ast);
}
BlockInputStreams watch(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override
{
return getNested()->watch(column_names, query_info, context, processed_stage, max_block_size, num_streams);
}
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override
{
return getNested()->read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
BlockOutputStreamPtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const Context & context) override
{
return getNested()->write(query, metadata_snapshot, context);
}
void drop() override { getNested()->drop(); }
void truncate(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const Context & context,
TableExclusiveLockHolder & lock) override
{
getNested()->truncate(query, metadata_snapshot, context, lock);
}
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override
{
getNested()->rename(new_path_to_table_data, new_table_id);
IStorage::renameInMemory(new_table_id);
}
void renameInMemory(const StorageID & new_table_id) override
{
getNested()->renameInMemory(new_table_id);
IStorage::renameInMemory(new_table_id);
}
void alter(const AlterCommands & params, const Context & context, TableLockHolder & alter_lock_holder) override
{
getNested()->alter(params, context, alter_lock_holder);
IStorage::setInMemoryMetadata(getNested()->getInMemoryMetadata());
}
void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings) const override
{
getNested()->checkAlterIsPossible(commands, settings);
}
Pipe alterPartition(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const PartitionCommands & commands,
const Context & context) override
{
return getNested()->alterPartition(query, metadata_snapshot, commands, context);
}
void checkAlterPartitionIsPossible(const PartitionCommands & commands, const StorageMetadataPtr & metadata_snapshot, const Settings & settings) const override
{
getNested()->checkAlterPartitionIsPossible(commands, metadata_snapshot, settings);
}
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Context & context) override
{
return getNested()->optimize(query, metadata_snapshot, partition, final, deduplicate, context);
}
void mutate(const MutationCommands & commands, const Context & context) override { getNested()->mutate(commands, context); }
CancellationCode killMutation(const String & mutation_id) override { return getNested()->killMutation(mutation_id); }
void startup() override { getNested()->startup(); }
void shutdown() override { getNested()->shutdown(); }
ActionLock getActionLock(StorageActionBlockType action_type) override { return getNested()->getActionLock(action_type); }
bool supportsIndexForIn() const override { return getNested()->supportsIndexForIn(); }
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context, const StorageMetadataPtr & metadata_snapshot) const override
{
return getNested()->mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot);
}
CheckResults checkData(const ASTPtr & query , const Context & context) override { return getNested()->checkData(query, context); }
void checkTableCanBeDropped() const override { getNested()->checkTableCanBeDropped(); }
void checkPartitionCanBeDropped(const ASTPtr & partition) override { getNested()->checkPartitionCanBeDropped(partition); }
Strings getDataPaths() const override { return getNested()->getDataPaths(); }
StoragePolicyPtr getStoragePolicy() const override { return getNested()->getStoragePolicy(); }
std::optional<UInt64> totalRows() const override { return getNested()->totalRows(); }
std::optional<UInt64> totalBytes() const override { return getNested()->totalBytes(); }
std::optional<UInt64> lifetimeRows() const override { return getNested()->lifetimeRows(); }
std::optional<UInt64> lifetimeBytes() const override { return getNested()->lifetimeBytes(); }
};
}

View File

@ -2,6 +2,9 @@
#include <Storages/IStorage.h>
#include <TableFunctions/ITableFunction.h>
#include <Processors/Pipe.h>
#include <Storages/StorageProxy.h>
#include <Common/CurrentThread.h>
#include <Processors/Transforms/ConvertingTransform.h>
namespace DB
{
@ -11,20 +14,53 @@ namespace ErrorCodes
extern const int INCOMPATIBLE_COLUMNS;
}
using GetStructureFunc = std::function<ColumnsDescription()>;
using GetNestedStorageFunc = std::function<StoragePtr()>;
template<typename StorageT>
class StorageTableFunction : public StorageT
class StorageTableFunctionProxy final : public StorageProxy
{
public:
template<typename... StorageArgs>
StorageTableFunction(GetStructureFunc get_structure_, StorageArgs && ... args)
: StorageT(std::forward<StorageArgs>(args)...), get_structure(std::move(get_structure_))
StorageTableFunctionProxy(const StorageID & table_id_, GetNestedStorageFunc get_nested_, ColumnsDescription cached_columns)
: StorageProxy(table_id_), get_nested(std::move(get_nested_))
{
StorageInMemoryMetadata cached_metadata;
cached_metadata.setColumns(std::move(cached_columns));
setInMemoryMetadata(cached_metadata);
}
String getName() const { return "TableFunction" + StorageT::getName(); }
StoragePtr getNested() const override
{
std::lock_guard lock{nested_mutex};
if (nested)
return nested;
auto nested_storage = get_nested();
nested_storage->startup();
nested = nested_storage;
get_nested = {};
return nested;
}
StoragePtr maybeGetNested() const
{
std::lock_guard lock{nested_mutex};
return nested;
}
String getName() const override
{
std::lock_guard lock{nested_mutex};
if (nested)
return nested->getName();
return StorageProxy::getName();
}
void startup() override { }
void shutdown() override
{
auto storage = maybeGetNested();
if (storage)
storage->shutdown();
}
Pipe read(
const Names & column_names,
@ -33,38 +69,51 @@ public:
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
unsigned num_streams) override
{
assertSourceStructure();
return StorageT::read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
auto storage = getNested();
auto nested_metadata = storage->getInMemoryMetadataPtr();
auto pipe = storage->read(column_names, nested_metadata, query_info, context, processed_stage, max_block_size, num_streams);
if (!pipe.empty())
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(
header,
metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()),
ConvertingTransform::MatchColumnsMode::Name);
});
}
return pipe;
}
BlockOutputStreamPtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const Context & context)
const Context & context) override
{
assertSourceStructure();
return StorageT::write(query, metadata_snapshot, context);
auto storage = getNested();
auto cached_structure = metadata_snapshot->getSampleBlock();
auto actual_structure = storage->getInMemoryMetadataPtr()->getSampleBlock();
if (!blocksHaveEqualStructure(actual_structure, cached_structure))
{
throw Exception("Source storage and table function have different structure", ErrorCodes::INCOMPATIBLE_COLUMNS);
}
return storage->write(query, metadata_snapshot, context);
}
void renameInMemory(const StorageID & new_table_id) override
{
if (maybeGetNested())
StorageProxy::renameInMemory(new_table_id);
else
IStorage::renameInMemory(new_table_id);
}
private:
void assertSourceStructure()
{
if (!get_structure)
return;
StorageInMemoryMetadata source_metadata;
source_metadata.setColumns(get_structure());
actual_source_structure = source_metadata.getSampleBlock();
if (!blocksHaveEqualStructure(StorageT::getInMemoryMetadataPtr()->getSampleBlock(), actual_source_structure))
throw Exception("Source storage and table function have different structure", ErrorCodes::INCOMPATIBLE_COLUMNS);
get_structure = {};
}
GetStructureFunc get_structure;
Block actual_source_structure;
mutable std::mutex nested_mutex;
mutable GetNestedStorageFunc get_nested;
mutable StoragePtr nested;
};
}

View File

@ -1,6 +1,7 @@
#include <TableFunctions/ITableFunction.h>
#include <Interpreters/Context.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageTableFunction.h>
#include <Access/AccessFlags.h>
#include <Common/ProfileEvents.h>
@ -18,7 +19,17 @@ StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context &
ProfileEvents::increment(ProfileEvents::TableFunctionExecute);
context.checkAccess(AccessType::CREATE_TEMPORARY_TABLE | StorageFactory::instance().getSourceAccessType(getStorageTypeName()));
cached_columns = std::move(cached_columns_);
return executeImpl(ast_function, context, table_name);
bool no_conversion_required = hasStaticStructure() && cached_columns == getActualTableStructure(ast_function, context);
if (cached_columns.empty() || no_conversion_required)
return executeImpl(ast_function, context, table_name);
auto get_storage = [=, tf = shared_from_this()]() -> StoragePtr
{
return tf->executeImpl(ast_function, context, table_name);
};
return std::make_shared<StorageTableFunctionProxy>(StorageID(getDatabaseName(), table_name), std::move(get_storage), cached_columns);
}
}

View File

@ -32,7 +32,9 @@ public:
/// Get the main function name.
virtual std::string getName() const = 0;
virtual ColumnsDescription getActualTableStructure(const ASTPtr & /*ast_function*/, const Context & /*context*/) const { return {}; }
virtual bool hasStaticStructure() const { return false; }
virtual ColumnsDescription getActualTableStructure(const ASTPtr & /*ast_function*/, const Context & /*context*/) const = 0;
/// Create storage according to the query.
StoragePtr execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns_ = {}) const;

View File

@ -63,19 +63,9 @@ void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, const C
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
parseArguments(ast_function, context);
if (cached_columns.empty())
cached_columns = getActualTableStructure(ast_function, context);
auto get_structure = [=, tf = shared_from_this()]()
{
return tf->getActualTableStructure(ast_function, context);
};
/// Create table
StoragePtr storage = getStorage(filename, format, cached_columns, const_cast<Context &>(context), table_name, compression_method, std::move(get_structure));
auto columns = getActualTableStructure(ast_function, context);
StoragePtr storage = getStorage(filename, format, columns, const_cast<Context &>(context), table_name, compression_method);
storage->startup();
return storage;
}

View File

@ -18,7 +18,7 @@ private:
virtual StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method, GetStructureFunc get_structure) const = 0;
const std::string & table_name, const String & compression_method) const = 0;
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) const override;

View File

@ -92,19 +92,8 @@ ColumnsDescription ITableFunctionXDBC::getActualTableStructure(const ASTPtr & as
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
parseArguments(ast_function, context);
if (cached_columns.empty())
cached_columns = getActualTableStructure(ast_function, context);
auto get_structure = [=, tf = shared_from_this()]()
{
return tf->getActualTableStructure(ast_function, context);
};
auto result = std::make_shared<StorageTableFunction<StorageXDBC>>(get_structure, StorageID(getDatabaseName(), table_name), schema_name, remote_table_name, cached_columns, context, helper);
if (!result)
throw Exception("Failed to instantiate storage from table function " + getName(), ErrorCodes::UNKNOWN_EXCEPTION);
auto columns = getActualTableStructure(ast_function, context);
auto result = std::make_shared<StorageXDBC>(StorageID(getDatabaseName(), table_name), schema_name, remote_table_name, columns, context, helper);
result->startup();
return result;
}

View File

@ -10,11 +10,11 @@ namespace DB
{
StoragePtr TableFunctionFile::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const std::string & compression_method, GetStructureFunc get_structure) const
const std::string & table_name, const std::string & compression_method) const
{
StorageFile::CommonArguments args{StorageID(getDatabaseName(), table_name), format, compression_method, columns, ConstraintsDescription{}, global_context};
return std::make_shared<StorageTableFunction<StorageFile>>(std::move(get_structure), source, global_context.getUserFilesPath(), args);
return StorageFile::create(source, global_context.getUserFilesPath(), args);
}
void registerTableFunctionFile(TableFunctionFactory & factory)

View File

@ -24,6 +24,6 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const std::string & compression_method, GetStructureFunc get_structure) const override;
const std::string & table_name, const std::string & compression_method) const override;
const char * getStorageTypeName() const override { return "File"; }
};}

View File

@ -89,15 +89,8 @@ ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(const AS
StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
parseArguments(ast_function, context);
if (cached_columns.empty())
cached_columns = getActualTableStructure(ast_function, context);
auto get_structure = [=, tf = shared_from_this()]()
{
return tf->getActualTableStructure(ast_function, context);
};
auto res = std::make_shared<StorageTableFunction<StorageGenerateRandom>>(get_structure, StorageID(getDatabaseName(), table_name), cached_columns, max_array_length, max_string_length, random_seed);
auto columns = getActualTableStructure(ast_function, context);
auto res = StorageGenerateRandom::create(StorageID(getDatabaseName(), table_name), columns, max_array_length, max_string_length, random_seed);
res->startup();
return res;
}

View File

@ -11,9 +11,9 @@ namespace DB
{
StoragePtr TableFunctionHDFS::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method, GetStructureFunc get_structure) const
const std::string & table_name, const String & compression_method) const
{
return std::make_shared<StorageTableFunction<StorageHDFS>>(std::move(get_structure),
return StorageHDFS::create(
source,
StorageID(getDatabaseName(), table_name),
format,

View File

@ -27,7 +27,7 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method, GetStructureFunc get_structure) const override;
const std::string & table_name, const String & compression_method) const override;
const char * getStorageTypeName() const override { return "HDFS"; }
};

View File

@ -52,18 +52,8 @@ ColumnsDescription TableFunctionInput::getActualTableStructure(const ASTPtr & as
StoragePtr TableFunctionInput::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
parseArguments(ast_function, context);
if (cached_columns.empty())
cached_columns = getActualTableStructure(ast_function, context);
auto get_structure = [=, tf = shared_from_this()]()
{
return tf->getActualTableStructure(ast_function, context);
};
StoragePtr storage = std::make_shared<StorageTableFunction<StorageInput>>(get_structure, StorageID(getDatabaseName(), table_name), cached_columns);
auto storage = StorageInput::create(StorageID(getDatabaseName(), table_name), getActualTableStructure(ast_function, context));
storage->startup();
return storage;
}

View File

@ -78,20 +78,14 @@ ColumnsDescription TableFunctionMerge::getActualTableStructure(const ASTPtr & as
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
parseArguments(ast_function, context);
if (cached_columns.empty())
cached_columns = getActualTableStructure(ast_function, context);
auto get_structure = [=, tf = shared_from_this()]()
{
return tf->getActualTableStructure(ast_function, context);
};
auto res = std::make_shared<StorageTableFunction<StorageMerge>>(std::move(get_structure),
auto res = StorageMerge::create(
StorageID(getDatabaseName(), table_name),
cached_columns,
getActualTableStructure(ast_function, context),
source_database,
table_name_regexp,
context);
res->startup();
return res;
}

View File

@ -124,24 +124,19 @@ ColumnsDescription TableFunctionMySQL::getActualTableStructure(const ASTPtr & as
StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
parseArguments(ast_function, context);
if (cached_columns.empty())
cached_columns = getActualTableStructure(ast_function, context);
if (!pool)
pool.emplace(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
auto get_structure = [=, tf = shared_from_this()]()
{
return tf->getActualTableStructure(ast_function, context);
};
auto columns = getActualTableStructure(ast_function, context);
auto res = std::make_shared<StorageTableFunction<StorageMySQL>>(std::move(get_structure),
auto res = StorageMySQL::create(
StorageID(getDatabaseName(), table_name),
std::move(*pool),
remote_database_name,
remote_table_name,
replace_query,
on_duplicate_clause,
cached_columns,
columns,
ConstraintsDescription{},
context);

View File

@ -20,29 +20,6 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <bool multithreaded>
void TableFunctionNumbers<multithreaded>::parseArguments(const ASTPtr & ast_function, const Context & context) const
{
if (const auto * function = ast_function->as<ASTFunction>())
{
auto arguments = function->arguments->children;
if (arguments.size() != 1 && arguments.size() != 2)
throw Exception("Table function '" + getName() + "' requires 'length' or 'offset, length'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (arguments.size() == 1)
length = evaluateArgument(context, arguments[0]);
else
{
offset = evaluateArgument(context, arguments[0]);
length = evaluateArgument(context, arguments[1]);
}
}
else
throw Exception("Table function '" + getName() + "' requires 'limit' or 'offset, limit'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
template <bool multithreaded>
ColumnsDescription TableFunctionNumbers<multithreaded>::getActualTableStructure(const ASTPtr & /*ast_function*/, const Context & /*context*/) const
@ -53,19 +30,21 @@ ColumnsDescription TableFunctionNumbers<multithreaded>::getActualTableStructure(
template <bool multithreaded>
StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
parseArguments(ast_function, context);
if (cached_columns.empty())
cached_columns = getActualTableStructure(ast_function, context);
auto get_structure = [=, tf = shared_from_this()]()
if (const auto * function = ast_function->as<ASTFunction>())
{
return tf->getActualTableStructure(ast_function, context);
};
auto arguments = function->arguments->children;
auto res = std::make_shared<StorageTableFunction<StorageSystemNumbers>>(get_structure, StorageID(getDatabaseName(), table_name), multithreaded, length, offset, false);
res->startup();
return res;
if (arguments.size() != 1 && arguments.size() != 2)
throw Exception("Table function '" + getName() + "' requires 'length' or 'offset, length'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
UInt64 offset = arguments.size() == 2 ? evaluateArgument(context, arguments[0]) : 0;
UInt64 length = arguments.size() == 2 ? evaluateArgument(context, arguments[1]) : evaluateArgument(context, arguments[0]);
auto res = StorageSystemNumbers::create(StorageID(getDatabaseName(), table_name), multithreaded, length, offset, false);
res->startup();
return res;
}
throw Exception("Table function '" + getName() + "' requires 'limit' or 'offset, limit'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
void registerTableFunctionNumbers(TableFunctionFactory & factory)

View File

@ -24,10 +24,6 @@ private:
UInt64 evaluateArgument(const Context & context, ASTPtr & argument) const;
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) const;
mutable UInt64 offset = 0;
mutable UInt64 length;
};

View File

@ -201,14 +201,11 @@ void TableFunctionRemote::prepareClusterInfo(const ASTPtr & ast_function, const
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
prepareClusterInfo(ast_function, context);
if (cached_columns.empty())
cached_columns = getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr);
//auto structure_remote_table = getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr);
StoragePtr res = remote_table_function_ptr
? StorageDistributed::create(
StorageID(getDatabaseName(), table_name),
cached_columns,
getActualTableStructure(ast_function, context),
ConstraintsDescription{},
remote_table_function_ptr,
String{},
@ -220,7 +217,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
cluster)
: StorageDistributed::create(
StorageID(getDatabaseName(), table_name),
cached_columns,
getActualTableStructure(ast_function, context),
ConstraintsDescription{},
remote_table_id.database_name,
remote_table_id.table_name,

View File

@ -70,26 +70,18 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Conte
{
parseArguments(ast_function, context);
if (cached_columns.empty())
cached_columns = getActualTableStructure(ast_function, context);
auto get_structure = [=, tf = shared_from_this()]()
{
return tf->getActualTableStructure(ast_function, context);
};
Poco::URI uri (filename);
S3::URI s3_uri (uri);
UInt64 min_upload_part_size = context.getSettingsRef().s3_min_upload_part_size;
StoragePtr storage = std::make_shared<StorageTableFunction<StorageS3>>(std::move(get_structure),
StoragePtr storage = StorageS3::create(
s3_uri,
access_key_id,
secret_access_key,
StorageID(getDatabaseName(), table_name),
format,
min_upload_part_size,
cached_columns,
getActualTableStructure(ast_function, context),
ConstraintsDescription{},
const_cast<Context &>(context),
compression_method);

View File

@ -11,11 +11,10 @@ namespace DB
{
StoragePtr TableFunctionURL::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method, GetStructureFunc get_structure) const
const std::string & table_name, const String & compression_method) const
{
Poco::URI uri(source);
return std::make_shared<StorageTableFunction<StorageURL>>(std::move(get_structure), uri,
StorageID(getDatabaseName(), table_name), format, columns, ConstraintsDescription{},
return StorageURL::create( uri, StorageID(getDatabaseName(), table_name), format, columns, ConstraintsDescription{},
global_context, compression_method);
}

View File

@ -22,7 +22,7 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method, GetStructureFunc get_structure) const override;
const std::string & table_name, const String & compression_method) const override;
const char * getStorageTypeName() const override { return "URL"; }
};

View File

@ -100,16 +100,10 @@ StoragePtr TableFunctionValues::executeImpl(const ASTPtr & ast_function, const C
{
parseArguments(ast_function, context);
if (cached_columns.empty())
cached_columns = getActualTableStructure(ast_function, context);
auto get_structure = [=, tf = shared_from_this()]()
{
return tf->getActualTableStructure(ast_function, context);
};
auto columns = getActualTableStructure(ast_function, context);
Block sample_block;
for (const auto & name_type : cached_columns.getOrdinary())
for (const auto & name_type : columns.getOrdinary())
sample_block.insert({ name_type.type->createColumn(), name_type.type, name_type.name });
MutableColumns res_columns = sample_block.cloneEmptyColumns();
@ -121,7 +115,7 @@ StoragePtr TableFunctionValues::executeImpl(const ASTPtr & ast_function, const C
Block res_block = sample_block.cloneWithColumns(std::move(res_columns));
auto res = std::make_shared<StorageTableFunction<StorageValues>>(get_structure, StorageID(getDatabaseName(), table_name), cached_columns, res_block);
auto res = StorageValues::create(StorageID(getDatabaseName(), table_name), columns, res_block);
res->startup();
return res;
}

View File

@ -19,23 +19,6 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <bool multithreaded>
void TableFunctionZeros<multithreaded>::parseArguments(const ASTPtr & ast_function, const Context & context) const
{
if (const auto * function = ast_function->as<ASTFunction>())
{
auto arguments = function->arguments->children;
if (arguments.size() != 1)
throw Exception("Table function '" + getName() + "' requires 'length'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
length = evaluateArgument(context, arguments[0]);
}
else
throw Exception("Table function '" + getName() + "' requires 'limit'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
template <bool multithreaded>
ColumnsDescription TableFunctionZeros<multithreaded>::getActualTableStructure(const ASTPtr & /*ast_function*/, const Context & /*context*/) const
@ -46,19 +29,21 @@ ColumnsDescription TableFunctionZeros<multithreaded>::getActualTableStructure(co
template <bool multithreaded>
StoragePtr TableFunctionZeros<multithreaded>::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
parseArguments(ast_function, context);
if (cached_columns.empty())
cached_columns = getActualTableStructure(ast_function, context);
auto get_structure = [=, tf = shared_from_this()]()
if (const auto * function = ast_function->as<ASTFunction>())
{
return tf->getActualTableStructure(ast_function, context);
};
auto arguments = function->arguments->children;
auto res = std::make_shared<StorageTableFunction<StorageSystemZeros>>(std::move(get_structure), StorageID(getDatabaseName(), table_name), multithreaded, length);
res->startup();
return res;
if (arguments.size() != 1)
throw Exception("Table function '" + getName() + "' requires 'length'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
UInt64 length = evaluateArgument(context, arguments[0]);
auto res = StorageSystemZeros::create(StorageID(getDatabaseName(), table_name), multithreaded, length);
res->startup();
return res;
}
throw Exception("Table function '" + getName() + "' requires 'limit'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
void registerTableFunctionZeros(TableFunctionFactory & factory)

View File

@ -24,9 +24,6 @@ private:
UInt64 evaluateArgument(const Context & context, ASTPtr & argument) const;
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) const;
mutable UInt64 length;
};