mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 21:24:28 +00:00
slightly better code
This commit is contained in:
parent
b6093d9a86
commit
3a9779adb4
@ -54,9 +54,8 @@ std::pair<String, StoragePtr> createTableFromAST(
|
||||
|
||||
if (ast_create_query.as_table_function)
|
||||
{
|
||||
const auto & ast_table_function = ast_create_query.as_table_function->as<ASTFunction &>();
|
||||
const auto & factory = TableFunctionFactory::instance();
|
||||
auto table_function = factory.get(ast_table_function.name, context);
|
||||
auto table_function = factory.get(ast_create_query.as_table_function, context);
|
||||
ColumnsDescription columns;
|
||||
if (ast_create_query.columns_list && ast_create_query.columns_list->columns)
|
||||
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, false);
|
||||
|
@ -154,8 +154,7 @@ void SelectStreamFactory::createForShard(
|
||||
|
||||
if (table_func_ptr)
|
||||
{
|
||||
const auto * table_function = table_func_ptr->as<ASTFunction>();
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_func_ptr, context);
|
||||
main_table_storage = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());
|
||||
}
|
||||
else
|
||||
|
@ -916,7 +916,7 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression)
|
||||
|
||||
if (!res)
|
||||
{
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression->as<ASTFunction>()->name, *this);
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression, *this);
|
||||
|
||||
/// Run it and remember the result
|
||||
res = table_function_ptr->execute(table_expression, *this, table_function_ptr->getName());
|
||||
|
@ -497,9 +497,9 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
|
||||
else if (create.as_table_function)
|
||||
{
|
||||
/// Table function without columns list.
|
||||
auto table_function = TableFunctionFactory::instance().get(create.as_table_function->as<ASTFunction &>().name, context);
|
||||
properties.columns = table_function->getActualTableStructure(create.as_table_function, context);
|
||||
if (properties.columns.empty()) //FIXME
|
||||
auto table_function = TableFunctionFactory::instance().get(create.as_table_function, context);
|
||||
properties.columns = table_function->getActualTableStructure(context);
|
||||
if (properties.columns.empty()) //FIXME TableFunctionFile may return empty structure for Distributed format
|
||||
return {};
|
||||
}
|
||||
else
|
||||
@ -768,9 +768,8 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
|
||||
/// NOTE: CREATE query may be rewritten by Storage creator or table function
|
||||
if (create.as_table_function)
|
||||
{
|
||||
const auto & table_function = create.as_table_function->as<ASTFunction &>();
|
||||
const auto & factory = TableFunctionFactory::instance();
|
||||
res = factory.get(table_function.name, context)->execute(create.as_table_function, context, create.table, properties.columns);
|
||||
res = factory.get(create.as_table_function, context)->execute(create.as_table_function, context, create.table, properties.columns);
|
||||
res->renameInMemory({create.database, create.table, create.uuid});
|
||||
}
|
||||
else
|
||||
|
@ -72,23 +72,16 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
|
||||
table_expression.subquery->children.at(0), context).getNamesAndTypesList();
|
||||
columns = ColumnsDescription(std::move(names_and_types));
|
||||
}
|
||||
else
|
||||
else if (table_expression.table_function)
|
||||
{
|
||||
StoragePtr table;
|
||||
if (table_expression.table_function)
|
||||
{
|
||||
const auto & table_function = table_expression.table_function->as<ASTFunction &>();
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function.name, context);
|
||||
/// Run the table function and remember the result
|
||||
table = table_function_ptr->execute(table_expression.table_function, context, table_function_ptr->getName());
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression.table_function, context);
|
||||
columns = table_function_ptr->getActualTableStructure(context);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto table_id = context.resolveStorageID(table_expression.database_and_table_name);
|
||||
context.checkAccess(AccessType::SHOW_COLUMNS, table_id);
|
||||
table = DatabaseCatalog::instance().getTable(table_id, context);
|
||||
}
|
||||
|
||||
auto table = DatabaseCatalog::instance().getTable(table_id, context);
|
||||
auto table_lock = table->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
|
||||
auto metadata_snapshot = table->getInMemoryMetadataPtr();
|
||||
columns = metadata_snapshot->getColumns();
|
||||
|
@ -59,9 +59,8 @@ StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query)
|
||||
{
|
||||
if (query.table_function)
|
||||
{
|
||||
const auto * table_function = query.table_function->as<ASTFunction>();
|
||||
const auto & factory = TableFunctionFactory::instance();
|
||||
TableFunctionPtr table_function_ptr = factory.get(table_function->name, context);
|
||||
TableFunctionPtr table_function_ptr = factory.get(query.table_function, context);
|
||||
return table_function_ptr->execute(query.table_function, context, table_function_ptr->getName());
|
||||
}
|
||||
|
||||
|
66
src/Interpreters/getHeaderForProcessingStage.cpp
Normal file
66
src/Interpreters/getHeaderForProcessingStage.cpp
Normal file
@ -0,0 +1,66 @@
|
||||
#include <Interpreters/getHeaderForProcessingStage.h>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <DataStreams/OneBlockInputStream.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
/// Rewrite original query removing joined tables from it
|
||||
void removeJoin(const ASTSelectQuery & select)
|
||||
{
|
||||
const auto & tables = select.tables();
|
||||
if (!tables || tables->children.size() < 2)
|
||||
return;
|
||||
|
||||
const auto & joined_table = tables->children[1]->as<ASTTablesInSelectQueryElement &>();
|
||||
if (!joined_table.table_join)
|
||||
return;
|
||||
|
||||
/// The most simple temporary solution: leave only the first table in query.
|
||||
/// TODO: we also need to remove joined columns and related functions (taking in account aliases if any).
|
||||
tables->children.resize(1);
|
||||
}
|
||||
|
||||
Block getHeaderForProcessingStage(
|
||||
const IStorage & storage,
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage)
|
||||
{
|
||||
switch (processed_stage)
|
||||
{
|
||||
case QueryProcessingStage::FetchColumns:
|
||||
{
|
||||
Block header = metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID());
|
||||
if (query_info.prewhere_info)
|
||||
{
|
||||
query_info.prewhere_info->prewhere_actions->execute(header);
|
||||
if (query_info.prewhere_info->remove_prewhere_column)
|
||||
header.erase(query_info.prewhere_info->prewhere_column_name);
|
||||
}
|
||||
return header;
|
||||
}
|
||||
case QueryProcessingStage::WithMergeableState:
|
||||
case QueryProcessingStage::Complete:
|
||||
{
|
||||
auto query = query_info.query->clone();
|
||||
removeJoin(*query->as<ASTSelectQuery>());
|
||||
|
||||
auto stream = std::make_shared<OneBlockInputStream>(
|
||||
metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID()));
|
||||
return InterpreterSelectQuery(query, context, stream, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
|
||||
}
|
||||
}
|
||||
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
}
|
||||
|
27
src/Interpreters/getHeaderForProcessingStage.h
Normal file
27
src/Interpreters/getHeaderForProcessingStage.h
Normal file
@ -0,0 +1,27 @@
|
||||
#pragma once
|
||||
#include <Core/Block.h>
|
||||
#include <Core/Names.h>
|
||||
#include <Core/QueryProcessingStage.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class IStorage;
|
||||
struct StorageInMemoryMetadata;
|
||||
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
|
||||
struct SelectQueryInfo;
|
||||
class Context;
|
||||
class ASTSelectQuery;
|
||||
|
||||
void removeJoin(const ASTSelectQuery & select);
|
||||
|
||||
Block getHeaderForProcessingStage(
|
||||
const IStorage & storage,
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage);
|
||||
|
||||
}
|
@ -18,7 +18,7 @@ namespace DB
|
||||
class StorageFileBlockInputStream;
|
||||
class StorageFileBlockOutputStream;
|
||||
|
||||
class StorageFile : public ext::shared_ptr_helper<StorageFile>, public IStorage
|
||||
class StorageFile final : public ext::shared_ptr_helper<StorageFile>, public IStorage
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageFile>;
|
||||
public:
|
||||
|
@ -9,7 +9,7 @@ namespace DB
|
||||
{
|
||||
/* Generates random data for given schema.
|
||||
*/
|
||||
class StorageGenerateRandom : public ext::shared_ptr_helper<StorageGenerateRandom>, public IStorage
|
||||
class StorageGenerateRandom final : public ext::shared_ptr_helper<StorageGenerateRandom>, public IStorage
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageGenerateRandom>;
|
||||
public:
|
||||
|
@ -13,7 +13,7 @@ namespace DB
|
||||
* This class represents table engine for external hdfs files.
|
||||
* Read method is supported for now.
|
||||
*/
|
||||
class StorageHDFS : public ext::shared_ptr_helper<StorageHDFS>, public IStorage
|
||||
class StorageHDFS final : public ext::shared_ptr_helper<StorageHDFS>, public IStorage
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageHDFS>;
|
||||
public:
|
||||
|
@ -8,7 +8,7 @@ namespace DB
|
||||
/** Internal temporary storage for table function input(...)
|
||||
*/
|
||||
|
||||
class StorageInput : public ext::shared_ptr_helper<StorageInput>, public IStorage
|
||||
class StorageInput final : public ext::shared_ptr_helper<StorageInput>, public IStorage
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageInput>;
|
||||
public:
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <Interpreters/getHeaderForProcessingStage.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
@ -37,27 +38,6 @@ namespace ErrorCodes
|
||||
extern const int SAMPLING_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
/// Rewrite original query removing joined tables from it
|
||||
void removeJoin(const ASTSelectQuery & select)
|
||||
{
|
||||
const auto & tables = select.tables();
|
||||
if (!tables || tables->children.size() < 2)
|
||||
return;
|
||||
|
||||
const auto & joined_table = tables->children[1]->as<ASTTablesInSelectQueryElement &>();
|
||||
if (!joined_table.table_join)
|
||||
return;
|
||||
|
||||
/// The most simple temporary solution: leave only the first table in query.
|
||||
/// TODO: we also need to remove joined columns and related functions (taking in account aliases if any).
|
||||
tables->children.resize(1);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
StorageMerge::StorageMerge(
|
||||
const StorageID & table_id_,
|
||||
@ -178,7 +158,7 @@ Pipe StorageMerge::read(
|
||||
modified_context->setSetting("optimize_move_to_prewhere", false);
|
||||
|
||||
/// What will be result structure depending on query processed stage in source tables?
|
||||
Block header = getQueryHeader(*this, column_names, metadata_snapshot, query_info, context, processed_stage);
|
||||
Block header = getHeaderForProcessingStage(*this, column_names, metadata_snapshot, query_info, context, processed_stage);
|
||||
|
||||
/** First we make list of selected tables to find out its size.
|
||||
* This is necessary to correctly pass the recommended number of threads to each table.
|
||||
@ -430,41 +410,6 @@ void StorageMerge::alter(
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
}
|
||||
|
||||
Block StorageMerge::getQueryHeader(
|
||||
const IStorage & storage,
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage)
|
||||
{
|
||||
switch (processed_stage)
|
||||
{
|
||||
case QueryProcessingStage::FetchColumns:
|
||||
{
|
||||
Block header = metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID());
|
||||
if (query_info.prewhere_info)
|
||||
{
|
||||
query_info.prewhere_info->prewhere_actions->execute(header);
|
||||
if (query_info.prewhere_info->remove_prewhere_column)
|
||||
header.erase(query_info.prewhere_info->prewhere_column_name);
|
||||
}
|
||||
return header;
|
||||
}
|
||||
case QueryProcessingStage::WithMergeableState:
|
||||
case QueryProcessingStage::Complete:
|
||||
{
|
||||
auto query = query_info.query->clone();
|
||||
removeJoin(*query->as<ASTSelectQuery>());
|
||||
|
||||
auto stream = std::make_shared<OneBlockInputStream>(
|
||||
metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID()));
|
||||
return InterpreterSelectQuery(query, context, stream, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
|
||||
}
|
||||
}
|
||||
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
void StorageMerge::convertingSourceStream(
|
||||
const Block & header,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
|
@ -13,7 +13,7 @@ namespace DB
|
||||
/** A table that represents the union of an arbitrary number of other tables.
|
||||
* All tables must have the same structure.
|
||||
*/
|
||||
class StorageMerge : public ext::shared_ptr_helper<StorageMerge>, public IStorage
|
||||
class StorageMerge final : public ext::shared_ptr_helper<StorageMerge>, public IStorage
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageMerge>;
|
||||
public:
|
||||
@ -47,14 +47,6 @@ public:
|
||||
bool mayBenefitFromIndexForIn(
|
||||
const ASTPtr & left_in_operand, const Context & query_context, const StorageMetadataPtr & metadata_snapshot) const override;
|
||||
|
||||
static Block getQueryHeader(
|
||||
const IStorage & storage,
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage);
|
||||
|
||||
private:
|
||||
String source_database;
|
||||
OptimizedRegularExpression table_name_regexp;
|
||||
|
@ -20,7 +20,7 @@ namespace DB
|
||||
* Use ENGINE = mysql(host_port, database_name, table_name, user_name, password)
|
||||
* Read only.
|
||||
*/
|
||||
class StorageMySQL : public ext::shared_ptr_helper<StorageMySQL>, public IStorage
|
||||
class StorageMySQL final : public ext::shared_ptr_helper<StorageMySQL>, public IStorage
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageMySQL>;
|
||||
public:
|
||||
|
@ -5,9 +5,8 @@
|
||||
#include <Storages/StorageProxy.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Processors/Transforms/ConvertingTransform.h>
|
||||
#include <Interpreters/getHeaderForProcessingStage.h>
|
||||
|
||||
//#include <common/logger_useful.h>
|
||||
#include <Storages/StorageMerge.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -28,17 +27,14 @@ public:
|
||||
StorageInMemoryMetadata cached_metadata;
|
||||
cached_metadata.setColumns(std::move(cached_columns));
|
||||
setInMemoryMetadata(cached_metadata);
|
||||
//log = &Poco::Logger::get("TABLE_FUNCTION_PROXY");
|
||||
}
|
||||
|
||||
StoragePtr getNested() const override
|
||||
{
|
||||
//LOG_WARNING(log, "getNested()");
|
||||
std::lock_guard lock{nested_mutex};
|
||||
if (nested)
|
||||
return nested;
|
||||
|
||||
//LOG_WARNING(log, "getNested() creating");
|
||||
auto nested_storage = get_nested();
|
||||
nested_storage->startup();
|
||||
nested = nested_storage;
|
||||
@ -46,12 +42,6 @@ public:
|
||||
return nested;
|
||||
}
|
||||
|
||||
StoragePtr maybeGetNested() const
|
||||
{
|
||||
std::lock_guard lock{nested_mutex};
|
||||
return nested;
|
||||
}
|
||||
|
||||
String getName() const override
|
||||
{
|
||||
std::lock_guard lock{nested_mutex};
|
||||
@ -63,9 +53,9 @@ public:
|
||||
void startup() override { }
|
||||
void shutdown() override
|
||||
{
|
||||
auto storage = maybeGetNested();
|
||||
if (storage)
|
||||
storage->shutdown();
|
||||
std::lock_guard lock{nested_mutex};
|
||||
if (nested)
|
||||
nested->shutdown();
|
||||
}
|
||||
|
||||
Pipe read(
|
||||
@ -80,19 +70,17 @@ public:
|
||||
String cnames;
|
||||
for (const auto & c : column_names)
|
||||
cnames += c + " ";
|
||||
//LOG_WARNING(log, "read() {} cols: {}", QueryProcessingStage::toString(processed_stage), cnames);
|
||||
auto storage = getNested();
|
||||
auto nested_metadata = storage->getInMemoryMetadataPtr();
|
||||
auto pipe = storage->read(column_names, nested_metadata, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
if (!pipe.empty())
|
||||
{
|
||||
auto to_header = getHeaderForProcessingStage(*this, column_names, metadata_snapshot, query_info, context, processed_stage);
|
||||
pipe.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
auto to = StorageMerge::getQueryHeader(*this, column_names, metadata_snapshot, query_info, context, processed_stage);
|
||||
//LOG_WARNING(log, "try convert \n{}\n to \n{}\n", header.dumpStructure(), to.dumpStructure());
|
||||
return std::make_shared<ConvertingTransform>(
|
||||
header,
|
||||
to,
|
||||
to_header,
|
||||
ConvertingTransform::MatchColumnsMode::Name);
|
||||
});
|
||||
}
|
||||
@ -116,7 +104,8 @@ public:
|
||||
|
||||
void renameInMemory(const StorageID & new_table_id) override
|
||||
{
|
||||
if (maybeGetNested())
|
||||
std::lock_guard lock{nested_mutex};
|
||||
if (nested)
|
||||
StorageProxy::renameInMemory(new_table_id);
|
||||
else
|
||||
IStorage::renameInMemory(new_table_id);
|
||||
@ -126,7 +115,6 @@ private:
|
||||
mutable std::mutex nested_mutex;
|
||||
mutable GetNestedStorageFunc get_nested;
|
||||
mutable StoragePtr nested;
|
||||
//mutable Poco::Logger * log;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ private:
|
||||
BlockOutputStreamPtr writer;
|
||||
};
|
||||
|
||||
class StorageURL : public ext::shared_ptr_helper<StorageURL>, public IStorageURLBase
|
||||
class StorageURL final : public ext::shared_ptr_helper<StorageURL>, public IStorageURLBase
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageURL>;
|
||||
public:
|
||||
|
@ -9,7 +9,7 @@ namespace DB
|
||||
/* One block storage used for values table function
|
||||
* It's structure is similar to IStorageSystemOneBlock
|
||||
*/
|
||||
class StorageValues : public ext::shared_ptr_helper<StorageValues>, public IStorage
|
||||
class StorageValues final : public ext::shared_ptr_helper<StorageValues>, public IStorage
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageValues>;
|
||||
public:
|
||||
|
@ -23,7 +23,7 @@ class Context;
|
||||
* In multithreaded case, if even_distributed is False, implementation with atomic is used,
|
||||
* and result is always in [0 ... limit - 1] range.
|
||||
*/
|
||||
class StorageSystemNumbers : public ext::shared_ptr_helper<StorageSystemNumbers>, public IStorage
|
||||
class StorageSystemNumbers final : public ext::shared_ptr_helper<StorageSystemNumbers>, public IStorage
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageSystemNumbers>;
|
||||
public:
|
||||
|
@ -14,7 +14,7 @@ namespace DB
|
||||
* You could also specify a limit (how many zeros to give).
|
||||
* If multithreaded is specified, zeros will be generated in several streams.
|
||||
*/
|
||||
class StorageSystemZeros : public ext::shared_ptr_helper<StorageSystemZeros>, public IStorage
|
||||
class StorageSystemZeros final : public ext::shared_ptr_helper<StorageSystemZeros>, public IStorage
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageSystemZeros>;
|
||||
public:
|
||||
|
@ -73,10 +73,8 @@ ColumnsDescription getStructureOfRemoteTableInShard(
|
||||
{
|
||||
if (shard_info.isLocal())
|
||||
{
|
||||
const auto * table_function = table_func_ptr->as<ASTFunction>();
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
|
||||
auto storage_ptr = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());
|
||||
return storage_ptr->getInMemoryMetadataPtr()->getColumns();
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_func_ptr, context);
|
||||
return table_function_ptr->getActualTableStructure(context);
|
||||
}
|
||||
|
||||
auto table_func_name = queryToString(table_func_ptr);
|
||||
|
@ -14,13 +14,12 @@ namespace ProfileEvents
|
||||
namespace DB
|
||||
{
|
||||
|
||||
StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns_) const
|
||||
StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::TableFunctionExecute);
|
||||
context.checkAccess(AccessType::CREATE_TEMPORARY_TABLE | StorageFactory::instance().getSourceAccessType(getStorageTypeName()));
|
||||
cached_columns = std::move(cached_columns_);
|
||||
|
||||
bool no_conversion_required = hasStaticStructure() && cached_columns == getActualTableStructure(ast_function, context);
|
||||
bool no_conversion_required = hasStaticStructure() && cached_columns == getActualTableStructure(context);
|
||||
if (cached_columns.empty() || no_conversion_required)
|
||||
return executeImpl(ast_function, context, table_name);
|
||||
|
||||
@ -29,7 +28,7 @@ StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context &
|
||||
return tf->executeImpl(ast_function, context, table_name);
|
||||
};
|
||||
|
||||
return std::make_shared<StorageTableFunctionProxy>(StorageID(getDatabaseName(), table_name), std::move(get_storage), cached_columns);
|
||||
return std::make_shared<StorageTableFunctionProxy>(StorageID(getDatabaseName(), table_name), std::move(get_storage), std::move(cached_columns));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -34,7 +34,9 @@ public:
|
||||
|
||||
virtual bool hasStaticStructure() const { return false; }
|
||||
|
||||
virtual ColumnsDescription getActualTableStructure(const ASTPtr & /*ast_function*/, const Context & /*context*/) const = 0;
|
||||
virtual void parseArguments(const ASTPtr & /*ast_function*/, const Context & /*context*/) {}
|
||||
|
||||
virtual ColumnsDescription getActualTableStructure(const Context & /*context*/) const = 0;
|
||||
|
||||
/// Create storage according to the query.
|
||||
StoragePtr execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns_ = {}) const;
|
||||
@ -44,9 +46,6 @@ public:
|
||||
private:
|
||||
virtual StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const = 0;
|
||||
virtual const char * getStorageTypeName() const = 0;
|
||||
|
||||
protected:
|
||||
mutable ColumnsDescription cached_columns;
|
||||
};
|
||||
|
||||
using TableFunctionPtr = std::shared_ptr<ITableFunction>;
|
||||
|
@ -22,11 +22,8 @@ namespace ErrorCodes
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, const Context & context) const
|
||||
void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
if (!filename.empty())
|
||||
return;
|
||||
|
||||
/// Parse args
|
||||
ASTs & args_func = ast_function->children;
|
||||
|
||||
@ -60,18 +57,16 @@ void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, const C
|
||||
compression_method = args[3]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
}
|
||||
|
||||
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
auto columns = getActualTableStructure(ast_function, context);
|
||||
auto columns = getActualTableStructure(context);
|
||||
StoragePtr storage = getStorage(filename, format, columns, const_cast<Context &>(context), table_name, compression_method);
|
||||
storage->startup();
|
||||
return storage;
|
||||
}
|
||||
|
||||
ColumnsDescription ITableFunctionFileLike::getActualTableStructure(const ASTPtr & ast_function, const Context & context) const
|
||||
ColumnsDescription ITableFunctionFileLike::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
if (structure.empty())
|
||||
{
|
||||
assert(getName() == "file" && format == "Distributed");
|
||||
|
@ -20,13 +20,13 @@ private:
|
||||
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
|
||||
const std::string & table_name, const String & compression_method) const = 0;
|
||||
|
||||
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) const override;
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) const;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
mutable String filename;
|
||||
mutable String format;
|
||||
mutable String structure;
|
||||
mutable String compression_method = "auto";
|
||||
String filename;
|
||||
String format;
|
||||
String structure;
|
||||
String compression_method = "auto";
|
||||
};
|
||||
}
|
||||
|
@ -25,15 +25,11 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int UNKNOWN_EXCEPTION;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
void ITableFunctionXDBC::parseArguments(const ASTPtr & ast_function, const Context & context) const
|
||||
void ITableFunctionXDBC::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
if (helper)
|
||||
return;
|
||||
|
||||
const auto & args_func = ast_function->as<ASTFunction &>();
|
||||
|
||||
if (!args_func.arguments)
|
||||
@ -65,9 +61,9 @@ void ITableFunctionXDBC::parseArguments(const ASTPtr & ast_function, const Conte
|
||||
helper->startBridgeSync();
|
||||
}
|
||||
|
||||
ColumnsDescription ITableFunctionXDBC::getActualTableStructure(const ASTPtr & ast_function, const Context & context) const
|
||||
ColumnsDescription ITableFunctionXDBC::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
assert(helper);
|
||||
|
||||
/* Infer external table structure */
|
||||
Poco::URI columns_info_uri = helper->getColumnsInfoURI();
|
||||
@ -89,10 +85,10 @@ ColumnsDescription ITableFunctionXDBC::getActualTableStructure(const ASTPtr & as
|
||||
return ColumnsDescription{columns};
|
||||
}
|
||||
|
||||
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
auto columns = getActualTableStructure(ast_function, context);
|
||||
assert(helper);
|
||||
auto columns = getActualTableStructure(context);
|
||||
auto result = std::make_shared<StorageXDBC>(StorageID(getDatabaseName(), table_name), schema_name, remote_table_name, columns, context, helper);
|
||||
result->startup();
|
||||
return result;
|
||||
|
@ -25,14 +25,14 @@ private:
|
||||
const Poco::Timespan & http_timeout_,
|
||||
const std::string & connection_string_) const = 0;
|
||||
|
||||
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) const override;
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) const;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
mutable String connection_string;
|
||||
mutable String schema_name;
|
||||
mutable String remote_table_name;
|
||||
mutable BridgeHelperPtr helper;
|
||||
String connection_string;
|
||||
String schema_name;
|
||||
String remote_table_name;
|
||||
BridgeHelperPtr helper;
|
||||
};
|
||||
|
||||
class TableFunctionJDBC : public ITableFunctionXDBC
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -28,19 +29,21 @@ void TableFunctionFactory::registerFunction(const std::string & name, Value crea
|
||||
}
|
||||
|
||||
TableFunctionPtr TableFunctionFactory::get(
|
||||
const std::string & name,
|
||||
const ASTPtr & ast_function,
|
||||
const Context & context) const
|
||||
{
|
||||
auto res = tryGet(name, context);
|
||||
const auto * table_function = ast_function->as<ASTFunction>();
|
||||
auto res = tryGet(table_function->name, context);
|
||||
if (!res)
|
||||
{
|
||||
auto hints = getHints(name);
|
||||
auto hints = getHints(table_function->name);
|
||||
if (!hints.empty())
|
||||
throw Exception("Unknown table function " + name + ". Maybe you meant: " + toString(hints), ErrorCodes::UNKNOWN_FUNCTION);
|
||||
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unknown table function {}. Maybe you meant: {}", table_function->name , toString(hints));
|
||||
else
|
||||
throw Exception("Unknown table function " + name, ErrorCodes::UNKNOWN_FUNCTION);
|
||||
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unknown table function {}", table_function->name);
|
||||
}
|
||||
|
||||
res->parseArguments(ast_function, context);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -41,7 +41,7 @@ public:
|
||||
}
|
||||
|
||||
/// Throws an exception if not found.
|
||||
TableFunctionPtr get(const std::string & name, const Context & context) const;
|
||||
TableFunctionPtr get(const ASTPtr & ast_function, const Context & context) const;
|
||||
|
||||
/// Returns nullptr if not found.
|
||||
TableFunctionPtr tryGet(const std::string & name, const Context & context) const;
|
||||
|
@ -29,11 +29,8 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
void TableFunctionGenerateRandom::parseArguments(const ASTPtr & ast_function, const Context & /*context*/) const
|
||||
void TableFunctionGenerateRandom::parseArguments(const ASTPtr & ast_function, const Context & /*context*/)
|
||||
{
|
||||
if (!structure.empty())
|
||||
return;
|
||||
|
||||
ASTs & args_func = ast_function->children;
|
||||
|
||||
if (args_func.size() != 1)
|
||||
@ -80,16 +77,14 @@ void TableFunctionGenerateRandom::parseArguments(const ASTPtr & ast_function, co
|
||||
max_array_length = args[3]->as<const ASTLiteral &>().value.safeGet<UInt64>();
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(const ASTPtr & ast_function, const Context & context) const
|
||||
ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
auto columns = getActualTableStructure(ast_function, context);
|
||||
auto columns = getActualTableStructure(context);
|
||||
auto res = StorageGenerateRandom::create(StorageID(getDatabaseName(), table_name), columns, max_array_length, max_string_length, random_seed);
|
||||
res->startup();
|
||||
return res;
|
||||
|
@ -17,13 +17,13 @@ private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
const char * getStorageTypeName() const override { return "GenerateRandom"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) const;
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
mutable String structure;
|
||||
mutable UInt64 max_string_length = 10;
|
||||
mutable UInt64 max_array_length = 10;
|
||||
mutable std::optional<UInt64> random_seed;
|
||||
String structure;
|
||||
UInt64 max_string_length = 10;
|
||||
UInt64 max_array_length = 10;
|
||||
std::optional<UInt64> random_seed;
|
||||
|
||||
};
|
||||
|
||||
|
@ -24,11 +24,8 @@ namespace ErrorCodes
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
void TableFunctionInput::parseArguments(const ASTPtr & ast_function, const Context & context) const
|
||||
void TableFunctionInput::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
if (!structure.empty())
|
||||
return;
|
||||
|
||||
const auto * function = ast_function->as<ASTFunction>();
|
||||
|
||||
if (!function->arguments)
|
||||
@ -43,16 +40,14 @@ void TableFunctionInput::parseArguments(const ASTPtr & ast_function, const Conte
|
||||
structure = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context)->as<ASTLiteral &>().value.safeGet<String>();
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionInput::getActualTableStructure(const ASTPtr & ast_function, const Context & context) const
|
||||
ColumnsDescription TableFunctionInput::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionInput::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
StoragePtr TableFunctionInput::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
auto storage = StorageInput::create(StorageID(getDatabaseName(), table_name), getActualTableStructure(ast_function, context));
|
||||
auto storage = StorageInput::create(StorageID(getDatabaseName(), table_name), getActualTableStructure(context));
|
||||
storage->startup();
|
||||
return storage;
|
||||
}
|
||||
|
@ -20,10 +20,10 @@ private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
const char * getStorageTypeName() const override { return "Input"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) const;
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
mutable String structure;
|
||||
String structure;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ static NamesAndTypesList chooseColumns(const String & source_database, const Str
|
||||
return any_table->getInMemoryMetadataPtr()->getColumns().getAllPhysical();
|
||||
}
|
||||
|
||||
void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, const Context & context) const
|
||||
void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
ASTs & args_func = ast_function->children;
|
||||
|
||||
@ -69,19 +69,16 @@ void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, const Conte
|
||||
table_name_regexp = args[1]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionMerge::getActualTableStructure(const ASTPtr & ast_function, const Context & context) const
|
||||
ColumnsDescription TableFunctionMerge::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
return ColumnsDescription{chooseColumns(source_database, table_name_regexp, context)};
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
|
||||
auto res = StorageMerge::create(
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
getActualTableStructure(ast_function, context),
|
||||
getActualTableStructure(context),
|
||||
source_database,
|
||||
table_name_regexp,
|
||||
context);
|
||||
|
@ -19,11 +19,11 @@ private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
const char * getStorageTypeName() const override { return "Merge"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) const;
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
mutable String source_database;
|
||||
mutable String table_name_regexp;
|
||||
String source_database;
|
||||
String table_name_regexp;
|
||||
};
|
||||
|
||||
|
||||
|
@ -34,7 +34,7 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_TABLE;
|
||||
}
|
||||
|
||||
void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, const Context & context) const
|
||||
void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
const auto & args_func = ast_function->as<ASTFunction &>();
|
||||
|
||||
@ -70,9 +70,9 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, const Conte
|
||||
parsed_host_port = parseAddress(host_port, 3306);
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionMySQL::getActualTableStructure(const ASTPtr & ast_function, const Context & context) const
|
||||
ColumnsDescription TableFunctionMySQL::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
assert(!parsed_host_port.first.empty());
|
||||
if (!pool)
|
||||
pool.emplace(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
|
||||
|
||||
@ -121,13 +121,13 @@ ColumnsDescription TableFunctionMySQL::getActualTableStructure(const ASTPtr & as
|
||||
return ColumnsDescription{columns};
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
assert(!parsed_host_port.first.empty());
|
||||
if (!pool)
|
||||
pool.emplace(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
|
||||
|
||||
auto columns = getActualTableStructure(ast_function, context);
|
||||
auto columns = getActualTableStructure(context);
|
||||
|
||||
auto res = StorageMySQL::create(
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
|
@ -27,16 +27,16 @@ private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
const char * getStorageTypeName() const override { return "MySQL"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) const;
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
mutable std::pair<std::string, UInt16> parsed_host_port;
|
||||
mutable String remote_database_name;
|
||||
mutable String remote_table_name;
|
||||
mutable String user_name;
|
||||
mutable String password;
|
||||
mutable bool replace_query = false;
|
||||
mutable String on_duplicate_clause;
|
||||
std::pair<std::string, UInt16> parsed_host_port;
|
||||
String remote_database_name;
|
||||
String remote_table_name;
|
||||
String user_name;
|
||||
String password;
|
||||
bool replace_query = false;
|
||||
String on_duplicate_clause;
|
||||
|
||||
mutable std::optional<mysqlxx::Pool> pool;
|
||||
};
|
||||
|
@ -22,7 +22,7 @@ namespace ErrorCodes
|
||||
|
||||
|
||||
template <bool multithreaded>
|
||||
ColumnsDescription TableFunctionNumbers<multithreaded>::getActualTableStructure(const ASTPtr & /*ast_function*/, const Context & /*context*/) const
|
||||
ColumnsDescription TableFunctionNumbers<multithreaded>::getActualTableStructure(const Context & /*context*/) const
|
||||
{
|
||||
return ColumnsDescription({{"number", std::make_shared<DataTypeUInt64>()}});
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ private:
|
||||
|
||||
UInt64 evaluateArgument(const Context & context, ASTPtr & argument) const;
|
||||
|
||||
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) const override;
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
};
|
||||
|
||||
|
||||
|
@ -28,11 +28,8 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
void TableFunctionRemote::prepareClusterInfo(const ASTPtr & ast_function, const Context & context) const
|
||||
void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
if (cluster)
|
||||
return;
|
||||
|
||||
ASTs & args_func = ast_function->children;
|
||||
|
||||
if (args_func.size() != 1)
|
||||
@ -198,14 +195,13 @@ void TableFunctionRemote::prepareClusterInfo(const ASTPtr & ast_function, const
|
||||
remote_table_id.table_name = remote_table;
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name) const
|
||||
{
|
||||
prepareClusterInfo(ast_function, context);
|
||||
|
||||
assert(cluster);
|
||||
StoragePtr res = remote_table_function_ptr
|
||||
? StorageDistributed::create(
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
getActualTableStructure(ast_function, context),
|
||||
getActualTableStructure(context),
|
||||
ConstraintsDescription{},
|
||||
remote_table_function_ptr,
|
||||
String{},
|
||||
@ -217,7 +213,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
|
||||
cluster)
|
||||
: StorageDistributed::create(
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
getActualTableStructure(ast_function, context),
|
||||
getActualTableStructure(context),
|
||||
ConstraintsDescription{},
|
||||
remote_table_id.database_name,
|
||||
remote_table_id.table_name,
|
||||
@ -233,9 +229,9 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
|
||||
return res;
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionRemote::getActualTableStructure(const ASTPtr & ast_function, const Context & context) const
|
||||
ColumnsDescription TableFunctionRemote::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
prepareClusterInfo(ast_function, context);
|
||||
assert(cluster);
|
||||
return getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr);
|
||||
}
|
||||
|
||||
|
@ -22,22 +22,22 @@ public:
|
||||
|
||||
std::string getName() const override { return name; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) const override;
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
const char * getStorageTypeName() const override { return "Distributed"; }
|
||||
|
||||
void prepareClusterInfo(const ASTPtr & ast_function, const Context & context) const;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
std::string name;
|
||||
bool is_cluster_function;
|
||||
std::string help_message;
|
||||
bool secure;
|
||||
|
||||
mutable ClusterPtr cluster;
|
||||
mutable StorageID remote_table_id = StorageID::createEmpty();
|
||||
mutable ASTPtr remote_table_function_ptr;
|
||||
ClusterPtr cluster;
|
||||
StorageID remote_table_id = StorageID::createEmpty();
|
||||
ASTPtr remote_table_function_ptr;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -22,10 +22,8 @@ namespace ErrorCodes
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
void TableFunctionS3::parseArguments(const ASTPtr & ast_function, const Context & context) const
|
||||
void TableFunctionS3::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
|
||||
|
||||
/// Parse args
|
||||
ASTs & args_func = ast_function->children;
|
||||
|
||||
@ -60,16 +58,13 @@ void TableFunctionS3::parseArguments(const ASTPtr & ast_function, const Context
|
||||
compression_method = args.back()->as<ASTLiteral &>().value.safeGet<String>();
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionS3::getActualTableStructure(const ASTPtr & ast_function, const Context & context) const
|
||||
ColumnsDescription TableFunctionS3::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
|
||||
Poco::URI uri (filename);
|
||||
S3::URI s3_uri (uri);
|
||||
UInt64 min_upload_part_size = context.getSettingsRef().s3_min_upload_part_size;
|
||||
@ -81,7 +76,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Conte
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
format,
|
||||
min_upload_part_size,
|
||||
getActualTableStructure(ast_function, context),
|
||||
getActualTableStructure(context),
|
||||
ConstraintsDescription{},
|
||||
const_cast<Context &>(context),
|
||||
compression_method);
|
||||
|
@ -31,15 +31,15 @@ protected:
|
||||
|
||||
const char * getStorageTypeName() const override { return "S3"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) const;
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
mutable String filename;
|
||||
mutable String format;
|
||||
mutable String structure;
|
||||
mutable String access_key_id;
|
||||
mutable String secret_access_key;
|
||||
mutable String compression_method = "auto";
|
||||
String filename;
|
||||
String format;
|
||||
String structure;
|
||||
String access_key_id;
|
||||
String secret_access_key;
|
||||
String compression_method = "auto";
|
||||
};
|
||||
|
||||
class TableFunctionCOS : public TableFunctionS3
|
||||
|
@ -14,7 +14,7 @@ StoragePtr TableFunctionURL::getStorage(
|
||||
const std::string & table_name, const String & compression_method_) const
|
||||
{
|
||||
Poco::URI uri(source);
|
||||
return StorageURL::create( uri, StorageID(getDatabaseName(), table_name), format_, columns, ConstraintsDescription{},
|
||||
return StorageURL::create(uri, StorageID(getDatabaseName(), table_name), format_, columns, ConstraintsDescription{},
|
||||
global_context, compression_method_);
|
||||
}
|
||||
|
||||
|
@ -63,7 +63,7 @@ static void parseAndInsertValues(MutableColumns & res_columns, const ASTs & args
|
||||
}
|
||||
}
|
||||
|
||||
void TableFunctionValues::parseArguments(const ASTPtr & ast_function, const Context & /*context*/) const
|
||||
void TableFunctionValues::parseArguments(const ASTPtr & ast_function, const Context & /*context*/)
|
||||
{
|
||||
|
||||
|
||||
@ -90,17 +90,14 @@ void TableFunctionValues::parseArguments(const ASTPtr & ast_function, const Cont
|
||||
structure = args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionValues::getActualTableStructure(const ASTPtr & ast_function, const Context & context) const
|
||||
ColumnsDescription TableFunctionValues::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionValues::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
{
|
||||
parseArguments(ast_function, context);
|
||||
|
||||
auto columns = getActualTableStructure(ast_function, context);
|
||||
auto columns = getActualTableStructure(context);
|
||||
|
||||
Block sample_block;
|
||||
for (const auto & name_type : columns.getOrdinary())
|
||||
|
@ -16,10 +16,10 @@ private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
const char * getStorageTypeName() const override { return "Values"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) const;
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
mutable String structure;
|
||||
String structure;
|
||||
};
|
||||
|
||||
|
||||
|
@ -21,7 +21,7 @@ extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
|
||||
|
||||
template <bool multithreaded>
|
||||
ColumnsDescription TableFunctionZeros<multithreaded>::getActualTableStructure(const ASTPtr & /*ast_function*/, const Context & /*context*/) const
|
||||
ColumnsDescription TableFunctionZeros<multithreaded>::getActualTableStructure(const Context & /*context*/) const
|
||||
{
|
||||
return ColumnsDescription({{"zero", std::make_shared<DataTypeUInt8>()}});
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ private:
|
||||
|
||||
UInt64 evaluateArgument(const Context & context, ASTPtr & argument) const;
|
||||
|
||||
ColumnsDescription getActualTableStructure(const ASTPtr & ast_function, const Context & context) const override;
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
};
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user