mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #14295 from ClickHouse/write_structure_of_table_functions
Write structure of table functions to metadata
This commit is contained in:
commit
f60ccb4edf
@ -52,6 +52,7 @@ RUN apt-get update \
|
||||
RUN echo "TSAN_OPTIONS='verbosity=1000 halt_on_error=1 history_size=7'" >> /etc/environment; \
|
||||
echo "UBSAN_OPTIONS='print_stacktrace=1'" >> /etc/environment; \
|
||||
echo "MSAN_OPTIONS='abort_on_error=1'" >> /etc/environment; \
|
||||
echo "LSAN_OPTIONS='suppressions=/usr/share/clickhouse-test/config/lsan_suppressions.txt'" >> /etc/environment; \
|
||||
ln -s /usr/lib/llvm-${LLVM_VERSION}/bin/llvm-symbolizer /usr/bin/llvm-symbolizer;
|
||||
# Sanitizer options for current shell (not current, but the one that will be spawned on "docker run")
|
||||
# (but w/o verbosity for TSAN, otherwise test.reference will not match)
|
||||
|
@ -54,9 +54,12 @@ std::pair<String, StoragePtr> createTableFromAST(
|
||||
|
||||
if (ast_create_query.as_table_function)
|
||||
{
|
||||
const auto & table_function = ast_create_query.as_table_function->as<ASTFunction &>();
|
||||
const auto & factory = TableFunctionFactory::instance();
|
||||
StoragePtr storage = factory.get(table_function.name, context)->execute(ast_create_query.as_table_function, context, ast_create_query.table);
|
||||
auto table_function = factory.get(ast_create_query.as_table_function, context);
|
||||
ColumnsDescription columns;
|
||||
if (ast_create_query.columns_list && ast_create_query.columns_list->columns)
|
||||
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, false);
|
||||
StoragePtr storage = table_function->execute(ast_create_query.as_table_function, context, ast_create_query.table, std::move(columns));
|
||||
storage->renameInMemory(ast_create_query);
|
||||
return {ast_create_query.table, storage};
|
||||
}
|
||||
|
@ -254,9 +254,12 @@ void DatabaseOrdinary::alterTable(const Context & context, const StorageID & tab
|
||||
|
||||
auto & ast_create_query = ast->as<ASTCreateQuery &>();
|
||||
|
||||
if (ast_create_query.as_table_function)
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot alter table {} because it was created AS table function", backQuote(table_name));
|
||||
bool has_structure = ast_create_query.columns_list && ast_create_query.columns_list->columns;
|
||||
if (ast_create_query.as_table_function && !has_structure)
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot alter table {} because it was created AS table function"
|
||||
" and doesn't have structure in metadata", backQuote(table_name));
|
||||
|
||||
assert(has_structure);
|
||||
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(metadata.columns);
|
||||
ASTPtr new_indices = InterpreterCreateQuery::formatIndices(metadata.secondary_indices);
|
||||
ASTPtr new_constraints = InterpreterCreateQuery::formatConstraints(metadata.constraints);
|
||||
|
@ -159,8 +159,7 @@ void SelectStreamFactory::createForShard(
|
||||
|
||||
if (table_func_ptr)
|
||||
{
|
||||
const auto * table_function = table_func_ptr->as<ASTFunction>();
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_func_ptr, context);
|
||||
main_table_storage = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());
|
||||
}
|
||||
else
|
||||
|
@ -933,7 +933,7 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression)
|
||||
|
||||
if (!res)
|
||||
{
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression->as<ASTFunction>()->name, *this);
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression, *this);
|
||||
|
||||
/// Run it and remember the result
|
||||
res = table_function_ptr->execute(table_expression, *this, table_function_ptr->getName());
|
||||
|
@ -453,6 +453,9 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
|
||||
|
||||
if (create.columns_list)
|
||||
{
|
||||
if (create.as_table_function && (create.columns_list->indices || create.columns_list->constraints))
|
||||
throw Exception("Indexes and constraints are not supported for table functions", ErrorCodes::INCORRECT_QUERY);
|
||||
|
||||
if (create.columns_list->columns)
|
||||
{
|
||||
bool sanity_check_compression_codecs = !create.attach && !context.getSettingsRef().allow_suspicious_codecs;
|
||||
@ -489,7 +492,12 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
|
||||
properties.columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
|
||||
}
|
||||
else if (create.as_table_function)
|
||||
return {};
|
||||
{
|
||||
/// Table function without columns list.
|
||||
auto table_function = TableFunctionFactory::instance().get(create.as_table_function, context);
|
||||
properties.columns = table_function->getActualTableStructure(context);
|
||||
assert(!properties.columns.empty());
|
||||
}
|
||||
else
|
||||
throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY);
|
||||
|
||||
@ -575,9 +583,12 @@ void InterpreterCreateQuery::validateTableStructure(const ASTCreateQuery & creat
|
||||
|
||||
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
|
||||
{
|
||||
if (create.as_table_function)
|
||||
return;
|
||||
|
||||
if (create.storage || create.is_view || create.is_materialized_view || create.is_live_view || create.is_dictionary)
|
||||
{
|
||||
if (create.temporary && create.storage->engine->name != "Memory")
|
||||
if (create.temporary && create.storage && create.storage->engine && create.storage->engine->name != "Memory")
|
||||
throw Exception(
|
||||
"Temporary tables can only be created with ENGINE = Memory, not " + create.storage->engine->name,
|
||||
ErrorCodes::INCORRECT_QUERY);
|
||||
@ -757,9 +768,8 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
|
||||
/// NOTE: CREATE query may be rewritten by Storage creator or table function
|
||||
if (create.as_table_function)
|
||||
{
|
||||
const auto & table_function = create.as_table_function->as<ASTFunction &>();
|
||||
const auto & factory = TableFunctionFactory::instance();
|
||||
res = factory.get(table_function.name, context)->execute(create.as_table_function, context, create.table);
|
||||
res = factory.get(create.as_table_function, context)->execute(create.as_table_function, context, create.table, properties.columns);
|
||||
res->renameInMemory({create.database, create.table, create.uuid});
|
||||
}
|
||||
else
|
||||
|
@ -72,23 +72,16 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
|
||||
table_expression.subquery->children.at(0), context).getNamesAndTypesList();
|
||||
columns = ColumnsDescription(std::move(names_and_types));
|
||||
}
|
||||
else if (table_expression.table_function)
|
||||
{
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression.table_function, context);
|
||||
columns = table_function_ptr->getActualTableStructure(context);
|
||||
}
|
||||
else
|
||||
{
|
||||
StoragePtr table;
|
||||
if (table_expression.table_function)
|
||||
{
|
||||
const auto & table_function = table_expression.table_function->as<ASTFunction &>();
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function.name, context);
|
||||
/// Run the table function and remember the result
|
||||
table = table_function_ptr->execute(table_expression.table_function, context, table_function_ptr->getName());
|
||||
}
|
||||
else
|
||||
{
|
||||
auto table_id = context.resolveStorageID(table_expression.database_and_table_name);
|
||||
context.checkAccess(AccessType::SHOW_COLUMNS, table_id);
|
||||
table = DatabaseCatalog::instance().getTable(table_id, context);
|
||||
}
|
||||
|
||||
auto table_id = context.resolveStorageID(table_expression.database_and_table_name);
|
||||
context.checkAccess(AccessType::SHOW_COLUMNS, table_id);
|
||||
auto table = DatabaseCatalog::instance().getTable(table_id, context);
|
||||
auto table_lock = table->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
|
||||
auto metadata_snapshot = table->getInMemoryMetadataPtr();
|
||||
columns = metadata_snapshot->getColumns();
|
||||
|
@ -67,9 +67,8 @@ StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query)
|
||||
{
|
||||
if (query.table_function)
|
||||
{
|
||||
const auto * table_function = query.table_function->as<ASTFunction>();
|
||||
const auto & factory = TableFunctionFactory::instance();
|
||||
TableFunctionPtr table_function_ptr = factory.get(table_function->name, context);
|
||||
TableFunctionPtr table_function_ptr = factory.get(query.table_function, context);
|
||||
return table_function_ptr->execute(query.table_function, context, table_function_ptr->getName());
|
||||
}
|
||||
|
||||
|
69
src/Interpreters/getHeaderForProcessingStage.cpp
Normal file
69
src/Interpreters/getHeaderForProcessingStage.cpp
Normal file
@ -0,0 +1,69 @@
|
||||
#include <Interpreters/getHeaderForProcessingStage.h>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <DataStreams/OneBlockInputStream.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
/// Rewrite original query removing joined tables from it
|
||||
bool removeJoin(ASTSelectQuery & select)
|
||||
{
|
||||
const auto & tables = select.tables();
|
||||
if (!tables || tables->children.size() < 2)
|
||||
return false;
|
||||
|
||||
const auto & joined_table = tables->children[1]->as<ASTTablesInSelectQueryElement &>();
|
||||
if (!joined_table.table_join)
|
||||
return false;
|
||||
|
||||
/// The most simple temporary solution: leave only the first table in query.
|
||||
/// TODO: we also need to remove joined columns and related functions (taking in account aliases if any).
|
||||
tables->children.resize(1);
|
||||
return true;
|
||||
}
|
||||
|
||||
Block getHeaderForProcessingStage(
|
||||
const IStorage & storage,
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage)
|
||||
{
|
||||
switch (processed_stage)
|
||||
{
|
||||
case QueryProcessingStage::FetchColumns:
|
||||
{
|
||||
Block header = metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID());
|
||||
if (query_info.prewhere_info)
|
||||
{
|
||||
query_info.prewhere_info->prewhere_actions->execute(header);
|
||||
if (query_info.prewhere_info->remove_prewhere_column)
|
||||
header.erase(query_info.prewhere_info->prewhere_column_name);
|
||||
}
|
||||
return header;
|
||||
}
|
||||
case QueryProcessingStage::WithMergeableState:
|
||||
case QueryProcessingStage::Complete:
|
||||
case QueryProcessingStage::WithMergeableStateAfterAggregation:
|
||||
case QueryProcessingStage::MAX:
|
||||
{
|
||||
auto query = query_info.query->clone();
|
||||
removeJoin(*query->as<ASTSelectQuery>());
|
||||
|
||||
auto stream = std::make_shared<OneBlockInputStream>(
|
||||
metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID()));
|
||||
return InterpreterSelectQuery(query, context, stream, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
|
||||
}
|
||||
}
|
||||
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
}
|
||||
|
27
src/Interpreters/getHeaderForProcessingStage.h
Normal file
27
src/Interpreters/getHeaderForProcessingStage.h
Normal file
@ -0,0 +1,27 @@
|
||||
#pragma once
|
||||
#include <Core/Block.h>
|
||||
#include <Core/Names.h>
|
||||
#include <Core/QueryProcessingStage.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class IStorage;
|
||||
struct StorageInMemoryMetadata;
|
||||
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
|
||||
struct SelectQueryInfo;
|
||||
class Context;
|
||||
class ASTSelectQuery;
|
||||
|
||||
bool removeJoin(ASTSelectQuery & select);
|
||||
|
||||
Block getHeaderForProcessingStage(
|
||||
const IStorage & storage,
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage);
|
||||
|
||||
}
|
@ -63,6 +63,7 @@ SRCS(
|
||||
ExtractExpressionInfoVisitor.cpp
|
||||
FillingRow.cpp
|
||||
getClusterName.cpp
|
||||
getHeaderForProcessingStage.cpp
|
||||
getTableExpressions.cpp
|
||||
HashJoin.cpp
|
||||
IdentifierSemantic.cpp
|
||||
|
@ -282,13 +282,23 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
|
||||
|
||||
if (as_table_function)
|
||||
{
|
||||
if (columns_list)
|
||||
{
|
||||
frame.expression_list_always_start_on_new_line = true;
|
||||
settings.ostr << (settings.one_line ? " (" : "\n(");
|
||||
FormatStateStacked frame_nested = frame;
|
||||
columns_list->formatImpl(settings, state, frame_nested);
|
||||
settings.ostr << (settings.one_line ? ")" : "\n)");
|
||||
frame.expression_list_always_start_on_new_line = false;
|
||||
}
|
||||
|
||||
settings.ostr << (settings.hilite ? hilite_keyword : "") << " AS " << (settings.hilite ? hilite_none : "");
|
||||
as_table_function->formatImpl(settings, state, frame);
|
||||
}
|
||||
|
||||
frame.expression_list_always_start_on_new_line = true;
|
||||
|
||||
if (columns_list)
|
||||
if (columns_list && !as_table_function)
|
||||
{
|
||||
settings.ostr << (settings.one_line ? " (" : "\n(");
|
||||
FormatStateStacked frame_nested = frame;
|
||||
|
@ -416,7 +416,12 @@ bool ParserCreateTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
|
||||
return false;
|
||||
|
||||
if (!storage_p.parse(pos, storage, expected) && !is_temporary)
|
||||
return false;
|
||||
{
|
||||
if (!s_as.ignore(pos, expected))
|
||||
return false;
|
||||
if (!table_function_p.parse(pos, as_table_function, expected))
|
||||
return false;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -360,12 +360,14 @@ StorageDistributed::StorageDistributed(
|
||||
const ASTPtr & sharding_key_,
|
||||
const String & storage_policy_name_,
|
||||
const String & relative_data_path_,
|
||||
bool attach_)
|
||||
bool attach_,
|
||||
ClusterPtr owned_cluster_)
|
||||
: IStorage(id_)
|
||||
, remote_database(remote_database_)
|
||||
, remote_table(remote_table_)
|
||||
, global_context(std::make_unique<Context>(context_))
|
||||
, log(&Poco::Logger::get("StorageDistributed (" + id_.table_name + ")"))
|
||||
, owned_cluster(std::move(owned_cluster_))
|
||||
, cluster_name(global_context->getMacros()->expand(cluster_name_))
|
||||
, has_sharding_key(sharding_key_)
|
||||
, relative_data_path(relative_data_path_)
|
||||
@ -411,39 +413,13 @@ StorageDistributed::StorageDistributed(
|
||||
const ASTPtr & sharding_key_,
|
||||
const String & storage_policy_name_,
|
||||
const String & relative_data_path_,
|
||||
bool attach)
|
||||
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_name_, relative_data_path_, attach)
|
||||
bool attach,
|
||||
ClusterPtr owned_cluster_)
|
||||
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_name_, relative_data_path_, attach, std::move(owned_cluster_))
|
||||
{
|
||||
remote_table_function_ptr = std::move(remote_table_function_ptr_);
|
||||
}
|
||||
|
||||
|
||||
StoragePtr StorageDistributed::createWithOwnCluster(
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
const String & remote_database_, /// database on remote servers.
|
||||
const String & remote_table_, /// The name of the table on the remote servers.
|
||||
ClusterPtr owned_cluster_,
|
||||
const Context & context_)
|
||||
{
|
||||
auto res = create(table_id_, columns_, ConstraintsDescription{}, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), String(), false);
|
||||
res->owned_cluster = std::move(owned_cluster_);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
StoragePtr StorageDistributed::createWithOwnCluster(
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
ASTPtr & remote_table_function_ptr_,
|
||||
ClusterPtr & owned_cluster_,
|
||||
const Context & context_)
|
||||
{
|
||||
auto res = create(table_id_, columns_, ConstraintsDescription{}, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), String(), false);
|
||||
res->owned_cluster = owned_cluster_;
|
||||
return res;
|
||||
}
|
||||
|
||||
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
|
||||
{
|
||||
const auto & settings = context.getSettingsRef();
|
||||
|
@ -42,21 +42,6 @@ class StorageDistributed final : public ext::shared_ptr_helper<StorageDistribute
|
||||
public:
|
||||
~StorageDistributed() override;
|
||||
|
||||
static StoragePtr createWithOwnCluster(
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
const String & remote_database_, /// database on remote servers.
|
||||
const String & remote_table_, /// The name of the table on the remote servers.
|
||||
ClusterPtr owned_cluster_,
|
||||
const Context & context_);
|
||||
|
||||
static StoragePtr createWithOwnCluster(
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
ASTPtr & remote_table_function_ptr_, /// Table function ptr.
|
||||
ClusterPtr & owned_cluster_,
|
||||
const Context & context_);
|
||||
|
||||
std::string getName() const override { return "Distributed"; }
|
||||
|
||||
bool supportsSampling() const override { return true; }
|
||||
@ -165,7 +150,8 @@ protected:
|
||||
const ASTPtr & sharding_key_,
|
||||
const String & storage_policy_name_,
|
||||
const String & relative_data_path_,
|
||||
bool attach_);
|
||||
bool attach_,
|
||||
ClusterPtr owned_cluster_ = {});
|
||||
|
||||
StorageDistributed(
|
||||
const StorageID & id_,
|
||||
@ -177,7 +163,8 @@ protected:
|
||||
const ASTPtr & sharding_key_,
|
||||
const String & storage_policy_name_,
|
||||
const String & relative_data_path_,
|
||||
bool attach);
|
||||
bool attach,
|
||||
ClusterPtr owned_cluster_ = {});
|
||||
|
||||
String relative_data_path;
|
||||
|
||||
|
@ -51,6 +51,7 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_IDENTIFIER;
|
||||
extern const int INCORRECT_FILE_NAME;
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int INCOMPATIBLE_COLUMNS;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -125,11 +126,33 @@ void checkCreationIsAllowed(const Context & context_global, const std::string &
|
||||
}
|
||||
}
|
||||
|
||||
Strings StorageFile::getPathsList(const String & table_path, const String & user_files_path, const Context & context)
|
||||
{
|
||||
String user_files_absolute_path = Poco::Path(user_files_path).makeAbsolute().makeDirectory().toString();
|
||||
Poco::Path poco_path = Poco::Path(table_path);
|
||||
if (poco_path.isRelative())
|
||||
poco_path = Poco::Path(user_files_absolute_path, poco_path);
|
||||
|
||||
Strings paths;
|
||||
const String path = poco_path.absolute().toString();
|
||||
if (path.find_first_of("*?{") == std::string::npos)
|
||||
paths.push_back(path);
|
||||
else
|
||||
paths = listFilesWithRegexpMatching("/", path);
|
||||
|
||||
for (const auto & cur_path : paths)
|
||||
checkCreationIsAllowed(context, user_files_absolute_path, cur_path);
|
||||
|
||||
return paths;
|
||||
}
|
||||
|
||||
StorageFile::StorageFile(int table_fd_, CommonArguments args)
|
||||
: StorageFile(args)
|
||||
{
|
||||
if (args.context.getApplicationType() == Context::ApplicationType::SERVER)
|
||||
throw Exception("Using file descriptor as source of storage isn't allowed for server daemons", ErrorCodes::DATABASE_ACCESS_DENIED);
|
||||
if (args.format_name == "Distributed")
|
||||
throw Exception("Distributed format is allowed only with explicit file path", ErrorCodes::INCORRECT_FILE_NAME);
|
||||
|
||||
is_db_table = false;
|
||||
use_table_fd = true;
|
||||
@ -144,32 +167,22 @@ StorageFile::StorageFile(const std::string & table_path_, const std::string & us
|
||||
: StorageFile(args)
|
||||
{
|
||||
is_db_table = false;
|
||||
std::string user_files_absolute_path = Poco::Path(user_files_path).makeAbsolute().makeDirectory().toString();
|
||||
Poco::Path poco_path = Poco::Path(table_path_);
|
||||
if (poco_path.isRelative())
|
||||
poco_path = Poco::Path(user_files_absolute_path, poco_path);
|
||||
|
||||
const std::string path = poco_path.absolute().toString();
|
||||
if (path.find_first_of("*?{") == std::string::npos)
|
||||
paths.push_back(path);
|
||||
else
|
||||
paths = listFilesWithRegexpMatching("/", path);
|
||||
|
||||
for (const auto & cur_path : paths)
|
||||
checkCreationIsAllowed(args.context, user_files_absolute_path, cur_path);
|
||||
paths = getPathsList(table_path_, user_files_path, args.context);
|
||||
|
||||
if (args.format_name == "Distributed")
|
||||
{
|
||||
if (!paths.empty())
|
||||
{
|
||||
auto & first_path = paths[0];
|
||||
Block header = StorageDistributedDirectoryMonitor::createStreamFromFile(first_path)->getHeader();
|
||||
if (paths.empty())
|
||||
throw Exception("Cannot get table structure from file, because no files match specified name", ErrorCodes::INCORRECT_FILE_NAME);
|
||||
|
||||
auto & first_path = paths[0];
|
||||
Block header = StorageDistributedDirectoryMonitor::createStreamFromFile(first_path)->getHeader();
|
||||
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
storage_metadata.setColumns(ColumnsDescription(header.getNamesAndTypesList()));
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
}
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
auto columns = ColumnsDescription(header.getNamesAndTypesList());
|
||||
if (!args.columns.empty() && columns != args.columns)
|
||||
throw Exception("Table structure and file structure are different", ErrorCodes::INCOMPATIBLE_COLUMNS);
|
||||
storage_metadata.setColumns(columns);
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
}
|
||||
}
|
||||
|
||||
@ -178,6 +191,8 @@ StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArgu
|
||||
{
|
||||
if (relative_table_dir_path.empty())
|
||||
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
|
||||
if (args.format_name == "Distributed")
|
||||
throw Exception("Distributed format is allowed only with explicit file path", ErrorCodes::INCORRECT_FILE_NAME);
|
||||
|
||||
String table_dir_path = base_path + relative_table_dir_path + "/";
|
||||
Poco::File(table_dir_path).createDirectories();
|
||||
|
@ -60,6 +60,8 @@ public:
|
||||
|
||||
NamesAndTypesList getVirtuals() const override;
|
||||
|
||||
static Strings getPathsList(const String & table_path, const String & user_files_path, const Context & context);
|
||||
|
||||
protected:
|
||||
friend class StorageFileSource;
|
||||
friend class StorageFileBlockOutputStream;
|
||||
|
@ -23,7 +23,7 @@ namespace DB
|
||||
{
|
||||
|
||||
StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseMaterializeMySQL * database_)
|
||||
: IStorage(nested_storage_->getStorageID()), nested_storage(nested_storage_), database(database_)
|
||||
: StorageProxy(nested_storage_->getStorageID()), nested_storage(nested_storage_), database(database_)
|
||||
{
|
||||
auto nested_memory_metadata = nested_storage->getInMemoryMetadata();
|
||||
StorageInMemoryMetadata in_memory_metadata;
|
||||
|
@ -4,31 +4,41 @@
|
||||
|
||||
#if USE_MYSQL
|
||||
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/StorageProxy.h>
|
||||
#include <Databases/MySQL/DatabaseMaterializeMySQL.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class StorageMaterializeMySQL final : public ext::shared_ptr_helper<StorageMaterializeMySQL>, public IStorage
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
class StorageMaterializeMySQL final : public ext::shared_ptr_helper<StorageMaterializeMySQL>, public StorageProxy
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageMaterializeMySQL>;
|
||||
public:
|
||||
String getName() const override { return "MaterializeMySQL"; }
|
||||
|
||||
bool supportsFinal() const override { return nested_storage->supportsFinal(); }
|
||||
bool supportsSampling() const override { return nested_storage->supportsSampling(); }
|
||||
|
||||
StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseMaterializeMySQL * database_);
|
||||
|
||||
Pipe read(
|
||||
const Names & column_names, const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info,
|
||||
const Context & context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override;
|
||||
|
||||
BlockOutputStreamPtr write(const ASTPtr &, const StorageMetadataPtr &, const Context &) override { throwNotAllowed(); }
|
||||
|
||||
NamesAndTypesList getVirtuals() const override;
|
||||
ColumnSizeByName getColumnSizes() const override;
|
||||
|
||||
private:
|
||||
StoragePtr getNested() const override { return nested_storage; }
|
||||
[[noreturn]] void throwNotAllowed() const
|
||||
{
|
||||
throw Exception("This method is not allowed for MaterializeMySQ", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
StoragePtr nested_storage;
|
||||
const DatabaseMaterializeMySQL * database;
|
||||
};
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <Interpreters/getHeaderForProcessingStage.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
@ -42,23 +43,6 @@ namespace ErrorCodes
|
||||
namespace
|
||||
{
|
||||
|
||||
/// Rewrite original query removing joined tables from it
|
||||
bool removeJoin(ASTSelectQuery & select)
|
||||
{
|
||||
const auto & tables = select.tables();
|
||||
if (!tables || tables->children.size() < 2)
|
||||
return false;
|
||||
|
||||
const auto & joined_table = tables->children[1]->as<ASTTablesInSelectQueryElement &>();
|
||||
if (!joined_table.table_join)
|
||||
return false;
|
||||
|
||||
/// The most simple temporary solution: leave only the first table in query.
|
||||
/// TODO: we also need to remove joined columns and related functions (taking in account aliases if any).
|
||||
tables->children.resize(1);
|
||||
return true;
|
||||
}
|
||||
|
||||
void modifySelect(ASTSelectQuery & select, const TreeRewriterResult & rewriter_result)
|
||||
{
|
||||
if (removeJoin(select))
|
||||
@ -83,7 +67,6 @@ void modifySelect(ASTSelectQuery & select, const TreeRewriterResult & rewriter_r
|
||||
|
||||
}
|
||||
|
||||
|
||||
StorageMerge::StorageMerge(
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
@ -203,7 +186,7 @@ Pipe StorageMerge::read(
|
||||
modified_context->setSetting("optimize_move_to_prewhere", false);
|
||||
|
||||
/// What will be result structure depending on query processed stage in source tables?
|
||||
Block header = getQueryHeader(column_names, metadata_snapshot, query_info, context, processed_stage);
|
||||
Block header = getHeaderForProcessingStage(*this, column_names, metadata_snapshot, query_info, context, processed_stage);
|
||||
|
||||
/** First we make list of selected tables to find out its size.
|
||||
* This is necessary to correctly pass the recommended number of threads to each table.
|
||||
@ -456,42 +439,6 @@ void StorageMerge::alter(
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
}
|
||||
|
||||
Block StorageMerge::getQueryHeader(
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage)
|
||||
{
|
||||
switch (processed_stage)
|
||||
{
|
||||
case QueryProcessingStage::FetchColumns:
|
||||
{
|
||||
Block header = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
|
||||
if (query_info.prewhere_info)
|
||||
{
|
||||
query_info.prewhere_info->prewhere_actions->execute(header);
|
||||
if (query_info.prewhere_info->remove_prewhere_column)
|
||||
header.erase(query_info.prewhere_info->prewhere_column_name);
|
||||
}
|
||||
return header;
|
||||
}
|
||||
case QueryProcessingStage::WithMergeableState:
|
||||
case QueryProcessingStage::Complete:
|
||||
case QueryProcessingStage::WithMergeableStateAfterAggregation:
|
||||
case QueryProcessingStage::MAX:
|
||||
{
|
||||
auto query = query_info.query->clone();
|
||||
removeJoin(*query->as<ASTSelectQuery>());
|
||||
|
||||
auto stream = std::make_shared<OneBlockInputStream>(
|
||||
metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()));
|
||||
return InterpreterSelectQuery(query, context, stream, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
|
||||
}
|
||||
}
|
||||
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
void StorageMerge::convertingSourceStream(
|
||||
const Block & header,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
|
@ -76,13 +76,6 @@ protected:
|
||||
const String & table_name_regexp_,
|
||||
const Context & context_);
|
||||
|
||||
Block getQueryHeader(
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage);
|
||||
|
||||
Pipe createSources(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
|
158
src/Storages/StorageProxy.h
Normal file
158
src/Storages/StorageProxy.h
Normal file
@ -0,0 +1,158 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Processors/Pipe.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class StorageProxy : public IStorage
|
||||
{
|
||||
public:
|
||||
|
||||
StorageProxy(const StorageID & table_id_) : IStorage(table_id_) {}
|
||||
|
||||
virtual StoragePtr getNested() const = 0;
|
||||
|
||||
String getName() const override { return "StorageProxy"; }
|
||||
|
||||
bool isRemote() const override { return getNested()->isRemote(); }
|
||||
bool isView() const override { return getNested()->isView(); }
|
||||
bool supportsSampling() const override { return getNested()->supportsSampling(); }
|
||||
bool supportsFinal() const override { return getNested()->supportsFinal(); }
|
||||
bool supportsPrewhere() const override { return getNested()->supportsPrewhere(); }
|
||||
bool supportsReplication() const override { return getNested()->supportsReplication(); }
|
||||
bool supportsParallelInsert() const override { return getNested()->supportsParallelInsert(); }
|
||||
bool supportsDeduplication() const override { return getNested()->supportsDeduplication(); }
|
||||
bool supportsSettings() const override { return getNested()->supportsSettings(); }
|
||||
bool noPushingToViews() const override { return getNested()->noPushingToViews(); }
|
||||
bool hasEvenlyDistributedRead() const override { return getNested()->hasEvenlyDistributedRead(); }
|
||||
|
||||
ColumnSizeByName getColumnSizes() const override { return getNested()->getColumnSizes(); }
|
||||
NamesAndTypesList getVirtuals() const override { return getNested()->getVirtuals(); }
|
||||
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const ASTPtr & ast) const override
|
||||
{
|
||||
return getNested()->getQueryProcessingStage(context, to_stage, ast);
|
||||
}
|
||||
|
||||
BlockInputStreams watch(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override
|
||||
{
|
||||
return getNested()->watch(column_names, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
}
|
||||
|
||||
Pipe read(
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override
|
||||
{
|
||||
return getNested()->read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
}
|
||||
|
||||
BlockOutputStreamPtr write(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const Context & context) override
|
||||
{
|
||||
return getNested()->write(query, metadata_snapshot, context);
|
||||
}
|
||||
|
||||
void drop() override { getNested()->drop(); }
|
||||
|
||||
void truncate(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const Context & context,
|
||||
TableExclusiveLockHolder & lock) override
|
||||
{
|
||||
getNested()->truncate(query, metadata_snapshot, context, lock);
|
||||
}
|
||||
|
||||
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override
|
||||
{
|
||||
getNested()->rename(new_path_to_table_data, new_table_id);
|
||||
IStorage::renameInMemory(new_table_id);
|
||||
}
|
||||
|
||||
void renameInMemory(const StorageID & new_table_id) override
|
||||
{
|
||||
getNested()->renameInMemory(new_table_id);
|
||||
IStorage::renameInMemory(new_table_id);
|
||||
}
|
||||
|
||||
void alter(const AlterCommands & params, const Context & context, TableLockHolder & alter_lock_holder) override
|
||||
{
|
||||
getNested()->alter(params, context, alter_lock_holder);
|
||||
IStorage::setInMemoryMetadata(getNested()->getInMemoryMetadata());
|
||||
}
|
||||
|
||||
void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings) const override
|
||||
{
|
||||
getNested()->checkAlterIsPossible(commands, settings);
|
||||
}
|
||||
|
||||
Pipe alterPartition(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const PartitionCommands & commands,
|
||||
const Context & context) override
|
||||
{
|
||||
return getNested()->alterPartition(query, metadata_snapshot, commands, context);
|
||||
}
|
||||
|
||||
void checkAlterPartitionIsPossible(const PartitionCommands & commands, const StorageMetadataPtr & metadata_snapshot, const Settings & settings) const override
|
||||
{
|
||||
getNested()->checkAlterPartitionIsPossible(commands, metadata_snapshot, settings);
|
||||
}
|
||||
|
||||
bool optimize(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const ASTPtr & partition,
|
||||
bool final,
|
||||
bool deduplicate,
|
||||
const Context & context) override
|
||||
{
|
||||
return getNested()->optimize(query, metadata_snapshot, partition, final, deduplicate, context);
|
||||
}
|
||||
|
||||
void mutate(const MutationCommands & commands, const Context & context) override { getNested()->mutate(commands, context); }
|
||||
|
||||
CancellationCode killMutation(const String & mutation_id) override { return getNested()->killMutation(mutation_id); }
|
||||
|
||||
void startup() override { getNested()->startup(); }
|
||||
void shutdown() override { getNested()->shutdown(); }
|
||||
|
||||
ActionLock getActionLock(StorageActionBlockType action_type) override { return getNested()->getActionLock(action_type); }
|
||||
|
||||
bool supportsIndexForIn() const override { return getNested()->supportsIndexForIn(); }
|
||||
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context, const StorageMetadataPtr & metadata_snapshot) const override
|
||||
{
|
||||
return getNested()->mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot);
|
||||
}
|
||||
|
||||
CheckResults checkData(const ASTPtr & query , const Context & context) override { return getNested()->checkData(query, context); }
|
||||
void checkTableCanBeDropped() const override { getNested()->checkTableCanBeDropped(); }
|
||||
void checkPartitionCanBeDropped(const ASTPtr & partition) override { getNested()->checkPartitionCanBeDropped(partition); }
|
||||
Strings getDataPaths() const override { return getNested()->getDataPaths(); }
|
||||
StoragePolicyPtr getStoragePolicy() const override { return getNested()->getStoragePolicy(); }
|
||||
std::optional<UInt64> totalRows() const override { return getNested()->totalRows(); }
|
||||
std::optional<UInt64> totalBytes() const override { return getNested()->totalBytes(); }
|
||||
std::optional<UInt64> lifetimeRows() const override { return getNested()->lifetimeRows(); }
|
||||
std::optional<UInt64> lifetimeBytes() const override { return getNested()->lifetimeBytes(); }
|
||||
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
|
137
src/Storages/StorageTableFunction.h
Normal file
137
src/Storages/StorageTableFunction.h
Normal file
@ -0,0 +1,137 @@
|
||||
#pragma once
|
||||
#include <Storages/IStorage.h>
|
||||
#include <TableFunctions/ITableFunction.h>
|
||||
#include <Processors/Pipe.h>
|
||||
#include <Storages/StorageProxy.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Processors/Transforms/ConvertingTransform.h>
|
||||
#include <Interpreters/getHeaderForProcessingStage.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int INCOMPATIBLE_COLUMNS;
|
||||
}
|
||||
|
||||
using GetNestedStorageFunc = std::function<StoragePtr()>;
|
||||
|
||||
/// Lazily creates underlying storage.
|
||||
/// Adds ConversionTransform in case of structure mismatch.
|
||||
class StorageTableFunctionProxy final : public StorageProxy
|
||||
{
|
||||
public:
|
||||
StorageTableFunctionProxy(const StorageID & table_id_, GetNestedStorageFunc get_nested_,
|
||||
ColumnsDescription cached_columns, bool add_conversion_ = true)
|
||||
: StorageProxy(table_id_), get_nested(std::move(get_nested_)), add_conversion(add_conversion_)
|
||||
{
|
||||
StorageInMemoryMetadata cached_metadata;
|
||||
cached_metadata.setColumns(std::move(cached_columns));
|
||||
setInMemoryMetadata(cached_metadata);
|
||||
}
|
||||
|
||||
StoragePtr getNested() const override
|
||||
{
|
||||
std::lock_guard lock{nested_mutex};
|
||||
if (nested)
|
||||
return nested;
|
||||
|
||||
auto nested_storage = get_nested();
|
||||
nested_storage->startup();
|
||||
nested_storage->renameInMemory(getStorageID());
|
||||
nested = nested_storage;
|
||||
get_nested = {};
|
||||
return nested;
|
||||
}
|
||||
|
||||
String getName() const override
|
||||
{
|
||||
std::lock_guard lock{nested_mutex};
|
||||
if (nested)
|
||||
return nested->getName();
|
||||
return StorageProxy::getName();
|
||||
}
|
||||
|
||||
void startup() override { }
|
||||
void shutdown() override
|
||||
{
|
||||
std::lock_guard lock{nested_mutex};
|
||||
if (nested)
|
||||
nested->shutdown();
|
||||
}
|
||||
|
||||
void drop() override
|
||||
{
|
||||
std::lock_guard lock{nested_mutex};
|
||||
if (nested)
|
||||
nested->drop();
|
||||
}
|
||||
|
||||
Pipe read(
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override
|
||||
{
|
||||
String cnames;
|
||||
for (const auto & c : column_names)
|
||||
cnames += c + " ";
|
||||
auto storage = getNested();
|
||||
auto nested_metadata = storage->getInMemoryMetadataPtr();
|
||||
auto pipe = storage->read(column_names, nested_metadata, query_info, context,
|
||||
processed_stage, max_block_size, num_streams);
|
||||
if (!pipe.empty() && add_conversion)
|
||||
{
|
||||
auto to_header = getHeaderForProcessingStage(*this, column_names, metadata_snapshot,
|
||||
query_info, context, processed_stage);
|
||||
pipe.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<ConvertingTransform>(
|
||||
header,
|
||||
to_header,
|
||||
ConvertingTransform::MatchColumnsMode::Name);
|
||||
});
|
||||
}
|
||||
return pipe;
|
||||
}
|
||||
|
||||
BlockOutputStreamPtr write(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const Context & context) override
|
||||
{
|
||||
auto storage = getNested();
|
||||
auto cached_structure = metadata_snapshot->getSampleBlock();
|
||||
auto actual_structure = storage->getInMemoryMetadataPtr()->getSampleBlock();
|
||||
if (!blocksHaveEqualStructure(actual_structure, cached_structure) && add_conversion)
|
||||
{
|
||||
throw Exception("Source storage and table function have different structure", ErrorCodes::INCOMPATIBLE_COLUMNS);
|
||||
}
|
||||
return storage->write(query, metadata_snapshot, context);
|
||||
}
|
||||
|
||||
void renameInMemory(const StorageID & new_table_id) override
|
||||
{
|
||||
std::lock_guard lock{nested_mutex};
|
||||
if (nested)
|
||||
StorageProxy::renameInMemory(new_table_id);
|
||||
else
|
||||
IStorage::renameInMemory(new_table_id);
|
||||
}
|
||||
|
||||
bool isView() const override { return false; }
|
||||
void checkTableCanBeDropped() const override {}
|
||||
|
||||
private:
|
||||
mutable std::mutex nested_mutex;
|
||||
mutable GetNestedStorageFunc get_nested;
|
||||
mutable StoragePtr nested;
|
||||
const bool add_conversion;
|
||||
};
|
||||
|
||||
}
|
@ -34,6 +34,7 @@ public:
|
||||
|
||||
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
|
||||
|
||||
std::string getName() const override;
|
||||
private:
|
||||
|
||||
BridgeHelperPtr bridge_helper;
|
||||
@ -61,8 +62,6 @@ private:
|
||||
size_t max_block_size) const override;
|
||||
|
||||
Block getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const override;
|
||||
|
||||
std::string getName() const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -38,10 +38,8 @@ ColumnsDescription getStructureOfRemoteTableInShard(
|
||||
{
|
||||
if (shard_info.isLocal())
|
||||
{
|
||||
const auto * table_function = table_func_ptr->as<ASTFunction>();
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
|
||||
auto storage_ptr = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());
|
||||
return storage_ptr->getInMemoryMetadataPtr()->getColumns();
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_func_ptr, context);
|
||||
return table_function_ptr->getActualTableStructure(context);
|
||||
}
|
||||
|
||||
auto table_func_name = queryToString(table_func_ptr);
|
||||
|
@ -6,3 +6,4 @@ list(REMOVE_ITEM clickhouse_table_functions_headers ITableFunction.h TableFuncti
|
||||
|
||||
add_library(clickhouse_table_functions ${clickhouse_table_functions_sources})
|
||||
target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms)
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <TableFunctions/ITableFunction.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/StorageTableFunction.h>
|
||||
#include <Access/AccessFlags.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
||||
@ -13,11 +14,23 @@ namespace ProfileEvents
|
||||
namespace DB
|
||||
{
|
||||
|
||||
StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name,
|
||||
ColumnsDescription cached_columns) const
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::TableFunctionExecute);
|
||||
context.checkAccess(AccessType::CREATE_TEMPORARY_TABLE | StorageFactory::instance().getSourceAccessType(getStorageTypeName()));
|
||||
return executeImpl(ast_function, context, table_name);
|
||||
|
||||
if (cached_columns.empty() || (hasStaticStructure() && cached_columns == getActualTableStructure(context)))
|
||||
return executeImpl(ast_function, context, table_name, std::move(cached_columns));
|
||||
|
||||
auto get_storage = [=, tf = shared_from_this()]() -> StoragePtr
|
||||
{
|
||||
return tf->executeImpl(ast_function, context, table_name, cached_columns);
|
||||
};
|
||||
|
||||
/// It will request actual table structure and create underlying storage lazily
|
||||
return std::make_shared<StorageTableFunctionProxy>(StorageID(getDatabaseName(), table_name), std::move(get_storage),
|
||||
std::move(cached_columns), needStructureConversion());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
@ -21,9 +22,18 @@ class Context;
|
||||
* Example:
|
||||
* SELECT count() FROM remote('example01-01-1', merge, hits)
|
||||
* - go to `example01-01-1`, in `merge` database, `hits` table.
|
||||
*
|
||||
*
|
||||
* When creating table AS table_function(...) we probably don't know structure of the table
|
||||
* and have to request if from remote server, because structure is required to create a Storage.
|
||||
* To avoid failures on server startup, we write obtained structure to metadata file.
|
||||
* So, table function may have two different columns lists:
|
||||
* - cached_columns written to metadata
|
||||
* - the list returned from getActualTableStructure(...)
|
||||
* See StorageTableFunctionProxy.
|
||||
*/
|
||||
|
||||
class ITableFunction
|
||||
class ITableFunction : public std::enable_shared_from_this<ITableFunction>
|
||||
{
|
||||
public:
|
||||
static inline std::string getDatabaseName() { return "_table_function"; }
|
||||
@ -31,13 +41,24 @@ public:
|
||||
/// Get the main function name.
|
||||
virtual std::string getName() const = 0;
|
||||
|
||||
/// Returns true if we always know table structure when executing table function
|
||||
/// (e.g. structure is specified in table function arguments)
|
||||
virtual bool hasStaticStructure() const { return false; }
|
||||
/// Returns false if storage returned by table function supports type conversion (e.g. StorageDistributed)
|
||||
virtual bool needStructureConversion() const { return true; }
|
||||
|
||||
virtual void parseArguments(const ASTPtr & /*ast_function*/, const Context & /*context*/) {}
|
||||
|
||||
/// Returns actual table structure probably requested from remote server, may fail
|
||||
virtual ColumnsDescription getActualTableStructure(const Context & /*context*/) const = 0;
|
||||
|
||||
/// Create storage according to the query.
|
||||
StoragePtr execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const;
|
||||
StoragePtr execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns_ = {}) const;
|
||||
|
||||
virtual ~ITableFunction() {}
|
||||
|
||||
private:
|
||||
virtual StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const = 0;
|
||||
virtual StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const = 0;
|
||||
virtual const char * getStorageTypeName() const = 0;
|
||||
};
|
||||
|
||||
|
@ -9,11 +9,12 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
|
||||
#include <Storages/StorageFile.h>
|
||||
#include <Storages/Distributed/DirectoryMonitor.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -21,9 +22,10 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int INCORRECT_FILE_NAME;
|
||||
}
|
||||
|
||||
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
/// Parse args
|
||||
ASTs & args_func = ast_function->children;
|
||||
@ -39,8 +41,8 @@ StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, cons
|
||||
for (auto & arg : args)
|
||||
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
|
||||
|
||||
std::string filename = args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
std::string format = args[1]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
filename = args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
format = args[1]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
|
||||
if (args.size() == 2 && getName() == "file")
|
||||
{
|
||||
@ -51,24 +53,33 @@ StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, cons
|
||||
throw Exception("Table function '" + getName() + "' requires 3 or 4 arguments: filename, format, structure and compression method (default auto).",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
ColumnsDescription columns;
|
||||
std::string compression_method = "auto";
|
||||
|
||||
if (args.size() > 2)
|
||||
{
|
||||
auto structure = args[2]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
columns = parseColumnsListFromString(structure, context);
|
||||
}
|
||||
structure = args[2]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
|
||||
if (args.size() == 4)
|
||||
compression_method = args[3]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
}
|
||||
|
||||
/// Create table
|
||||
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
auto columns = getActualTableStructure(context);
|
||||
StoragePtr storage = getStorage(filename, format, columns, const_cast<Context &>(context), table_name, compression_method);
|
||||
|
||||
storage->startup();
|
||||
|
||||
return storage;
|
||||
}
|
||||
|
||||
ColumnsDescription ITableFunctionFileLike::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
if (structure.empty())
|
||||
{
|
||||
assert(getName() == "file" && format == "Distributed");
|
||||
Strings paths = StorageFile::getPathsList(filename, context.getUserFilesPath(), context);
|
||||
if (paths.empty())
|
||||
throw Exception("Cannot get table structure from file, because no files match specified name", ErrorCodes::INCORRECT_FILE_NAME);
|
||||
auto read_stream = StorageDistributedDirectoryMonitor::createStreamFromFile(paths[0]);
|
||||
return ColumnsDescription{read_stream->getHeader().getNamesAndTypesList()};
|
||||
}
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -13,8 +13,21 @@ class Context;
|
||||
class ITableFunctionFileLike : public ITableFunction
|
||||
{
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
|
||||
virtual StoragePtr getStorage(
|
||||
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const = 0;
|
||||
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
|
||||
const std::string & table_name, const String & compression_method) const = 0;
|
||||
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
bool hasStaticStructure() const override { return true; }
|
||||
|
||||
String filename;
|
||||
String format;
|
||||
String structure;
|
||||
String compression_method = "auto";
|
||||
};
|
||||
}
|
||||
|
@ -24,11 +24,10 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int UNKNOWN_EXCEPTION;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
void ITableFunctionXDBC::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
const auto & args_func = ast_function->as<ASTFunction &>();
|
||||
|
||||
@ -44,10 +43,6 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co
|
||||
for (auto & arg : args)
|
||||
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
|
||||
|
||||
std::string connection_string;
|
||||
std::string schema_name;
|
||||
std::string remote_table_name;
|
||||
|
||||
if (args.size() == 3)
|
||||
{
|
||||
connection_string = args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
@ -60,11 +55,16 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co
|
||||
remote_table_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
}
|
||||
|
||||
/* Infer external table structure */
|
||||
/// Have to const_cast, because bridges store their commands inside context
|
||||
BridgeHelperPtr helper = createBridgeHelper(const_cast<Context &>(context), context.getSettingsRef().http_receive_timeout.value, connection_string);
|
||||
helper = createBridgeHelper(const_cast<Context &>(context), context.getSettingsRef().http_receive_timeout.value, connection_string);
|
||||
helper->startBridgeSync();
|
||||
}
|
||||
|
||||
ColumnsDescription ITableFunctionXDBC::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
assert(helper);
|
||||
|
||||
/* Infer external table structure */
|
||||
Poco::URI columns_info_uri = helper->getColumnsInfoURI();
|
||||
columns_info_uri.addQueryParameter("connection_string", connection_string);
|
||||
if (!schema_name.empty())
|
||||
@ -73,7 +73,7 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co
|
||||
|
||||
const auto use_nulls = context.getSettingsRef().external_table_functions_use_nulls;
|
||||
columns_info_uri.addQueryParameter("external_table_functions_use_nulls",
|
||||
Poco::NumberFormatter::format(use_nulls));
|
||||
Poco::NumberFormatter::format(use_nulls));
|
||||
|
||||
ReadWriteBufferFromHTTP buf(columns_info_uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
|
||||
|
||||
@ -81,11 +81,14 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co
|
||||
readStringBinary(columns_info, buf);
|
||||
NamesAndTypesList columns = NamesAndTypesList::parse(columns_info);
|
||||
|
||||
auto result = std::make_shared<StorageXDBC>(StorageID(getDatabaseName(), table_name), schema_name, remote_table_name, ColumnsDescription{columns}, context, helper);
|
||||
|
||||
if (!result)
|
||||
throw Exception("Failed to instantiate storage from table function " + getName(), ErrorCodes::UNKNOWN_EXCEPTION);
|
||||
return ColumnsDescription{columns};
|
||||
}
|
||||
|
||||
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
assert(helper);
|
||||
auto columns = getActualTableStructure(context);
|
||||
auto result = std::make_shared<StorageXDBC>(StorageID(getDatabaseName(), table_name), schema_name, remote_table_name, columns, context, helper);
|
||||
result->startup();
|
||||
return result;
|
||||
}
|
||||
|
@ -18,12 +18,21 @@ namespace DB
|
||||
class ITableFunctionXDBC : public ITableFunction
|
||||
{
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
|
||||
/* A factory method to create bridge helper, that will assist in remote interaction */
|
||||
virtual BridgeHelperPtr createBridgeHelper(Context & context,
|
||||
const Poco::Timespan & http_timeout_,
|
||||
const std::string & connection_string_) const = 0;
|
||||
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
String connection_string;
|
||||
String schema_name;
|
||||
String remote_table_name;
|
||||
BridgeHelperPtr helper;
|
||||
};
|
||||
|
||||
class TableFunctionJDBC : public ITableFunctionXDBC
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -28,19 +29,21 @@ void TableFunctionFactory::registerFunction(const std::string & name, Value crea
|
||||
}
|
||||
|
||||
TableFunctionPtr TableFunctionFactory::get(
|
||||
const std::string & name,
|
||||
const ASTPtr & ast_function,
|
||||
const Context & context) const
|
||||
{
|
||||
auto res = tryGet(name, context);
|
||||
const auto * table_function = ast_function->as<ASTFunction>();
|
||||
auto res = tryGet(table_function->name, context);
|
||||
if (!res)
|
||||
{
|
||||
auto hints = getHints(name);
|
||||
auto hints = getHints(table_function->name);
|
||||
if (!hints.empty())
|
||||
throw Exception("Unknown table function " + name + ". Maybe you meant: " + toString(hints), ErrorCodes::UNKNOWN_FUNCTION);
|
||||
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unknown table function {}. Maybe you meant: {}", table_function->name , toString(hints));
|
||||
else
|
||||
throw Exception("Unknown table function " + name, ErrorCodes::UNKNOWN_FUNCTION);
|
||||
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unknown table function {}", table_function->name);
|
||||
}
|
||||
|
||||
res->parseArguments(ast_function, context);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -41,7 +41,7 @@ public:
|
||||
}
|
||||
|
||||
/// Throws an exception if not found.
|
||||
TableFunctionPtr get(const std::string & name, const Context & context) const;
|
||||
TableFunctionPtr get(const ASTPtr & ast_function, const Context & context) const;
|
||||
|
||||
/// Returns nullptr if not found.
|
||||
TableFunctionPtr tryGet(const std::string & name, const Context & context) const;
|
||||
|
@ -9,9 +9,10 @@
|
||||
namespace DB
|
||||
{
|
||||
StoragePtr TableFunctionFile::getStorage(
|
||||
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const std::string & compression_method) const
|
||||
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
|
||||
const std::string & table_name, const std::string & compression_method_) const
|
||||
{
|
||||
StorageFile::CommonArguments args{StorageID(getDatabaseName(), table_name), format, compression_method, columns, ConstraintsDescription{}, global_context};
|
||||
StorageFile::CommonArguments args{StorageID(getDatabaseName(), table_name), format_, compression_method_, columns, ConstraintsDescription{}, global_context};
|
||||
|
||||
return StorageFile::create(source, global_context.getUserFilesPath(), args);
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ public:
|
||||
|
||||
private:
|
||||
StoragePtr getStorage(
|
||||
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const std::string & compression_method) const override;
|
||||
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
|
||||
const std::string & table_name, const std::string & compression_method_) const override;
|
||||
const char * getStorageTypeName() const override { return "File"; }
|
||||
};}
|
||||
|
@ -26,7 +26,7 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
void TableFunctionGenerateRandom::parseArguments(const ASTPtr & ast_function, const Context & /*context*/)
|
||||
{
|
||||
ASTs & args_func = ast_function->children;
|
||||
|
||||
@ -58,11 +58,7 @@ StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & ast_function,
|
||||
}
|
||||
|
||||
/// Parsing first argument as table structure and creating a sample block
|
||||
std::string structure = args[0]->as<const ASTLiteral &>().value.safeGet<String>();
|
||||
|
||||
UInt64 max_string_length = 10;
|
||||
UInt64 max_array_length = 10;
|
||||
std::optional<UInt64> random_seed;
|
||||
structure = args[0]->as<const ASTLiteral &>().value.safeGet<String>();
|
||||
|
||||
if (args.size() >= 2)
|
||||
{
|
||||
@ -76,10 +72,16 @@ StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & ast_function,
|
||||
|
||||
if (args.size() == 4)
|
||||
max_array_length = args[3]->as<const ASTLiteral &>().value.safeGet<UInt64>();
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
||||
ColumnsDescription columns = parseColumnsListFromString(structure, context);
|
||||
|
||||
StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
auto columns = getActualTableStructure(context);
|
||||
auto res = StorageGenerateRandom::create(StorageID(getDatabaseName(), table_name), columns, max_array_length, max_string_length, random_seed);
|
||||
res->startup();
|
||||
return res;
|
||||
|
@ -13,9 +13,19 @@ class TableFunctionGenerateRandom : public ITableFunction
|
||||
public:
|
||||
static constexpr auto name = "generateRandom";
|
||||
std::string getName() const override { return name; }
|
||||
bool hasStaticStructure() const override { return true; }
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
const char * getStorageTypeName() const override { return "GenerateRandom"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
String structure;
|
||||
UInt64 max_string_length = 10;
|
||||
UInt64 max_array_length = 10;
|
||||
std::optional<UInt64> random_seed;
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
@ -10,15 +10,17 @@
|
||||
namespace DB
|
||||
{
|
||||
StoragePtr TableFunctionHDFS::getStorage(
|
||||
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const
|
||||
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
|
||||
const std::string & table_name, const String & compression_method_) const
|
||||
{
|
||||
return StorageHDFS::create(source,
|
||||
return StorageHDFS::create(
|
||||
source,
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
format,
|
||||
format_,
|
||||
columns,
|
||||
ConstraintsDescription{},
|
||||
global_context,
|
||||
compression_method);
|
||||
compression_method_);
|
||||
}
|
||||
|
||||
|
||||
|
@ -26,7 +26,8 @@ public:
|
||||
|
||||
private:
|
||||
StoragePtr getStorage(
|
||||
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const override;
|
||||
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
|
||||
const std::string & table_name, const String & compression_method_) const override;
|
||||
const char * getStorageTypeName() const override { return "HDFS"; }
|
||||
};
|
||||
|
||||
|
@ -22,7 +22,7 @@ namespace ErrorCodes
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionInput::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
void TableFunctionInput::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
const auto * function = ast_function->as<ASTFunction>();
|
||||
|
||||
@ -35,12 +35,18 @@ StoragePtr TableFunctionInput::executeImpl(const ASTPtr & ast_function, const Co
|
||||
throw Exception("Table function '" + getName() + "' requires exactly 1 argument: structure",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
String structure = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context)->as<ASTLiteral &>().value.safeGet<String>();
|
||||
auto columns = parseColumnsListFromString(structure, context);
|
||||
StoragePtr storage = StorageInput::create(StorageID(getDatabaseName(), table_name), columns);
|
||||
structure = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context)->as<ASTLiteral &>().value.safeGet<String>();
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionInput::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionInput::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
auto storage = StorageInput::create(StorageID(getDatabaseName(), table_name), getActualTableStructure(context));
|
||||
storage->startup();
|
||||
|
||||
return storage;
|
||||
}
|
||||
|
||||
|
@ -15,10 +15,16 @@ class TableFunctionInput : public ITableFunction
|
||||
public:
|
||||
static constexpr auto name = "input";
|
||||
std::string getName() const override { return name; }
|
||||
bool hasStaticStructure() const override { return true; }
|
||||
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
const char * getStorageTypeName() const override { return "Input"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
String structure;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -45,8 +45,7 @@ static NamesAndTypesList chooseColumns(const String & source_database, const Str
|
||||
return any_table->getInMemoryMetadataPtr()->getColumns().getAllPhysical();
|
||||
}
|
||||
|
||||
|
||||
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
ASTs & args_func = ast_function->children;
|
||||
|
||||
@ -65,15 +64,24 @@ StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & ast_function, const Co
|
||||
args[0] = evaluateConstantExpressionForDatabaseName(args[0], context);
|
||||
args[1] = evaluateConstantExpressionAsLiteral(args[1], context);
|
||||
|
||||
String source_database = args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
String table_name_regexp = args[1]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
source_database = args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
table_name_regexp = args[1]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionMerge::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
return ColumnsDescription{chooseColumns(source_database, table_name_regexp, context)};
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
auto res = StorageMerge::create(
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
ColumnsDescription{chooseColumns(source_database, table_name_regexp, context)},
|
||||
getActualTableStructure(context),
|
||||
source_database,
|
||||
table_name_regexp,
|
||||
context);
|
||||
|
||||
res->startup();
|
||||
return res;
|
||||
}
|
||||
|
@ -16,8 +16,14 @@ public:
|
||||
static constexpr auto name = "merge";
|
||||
std::string getName() const override { return name; }
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
const char * getStorageTypeName() const override { return "Merge"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
String source_database;
|
||||
String table_name_regexp;
|
||||
};
|
||||
|
||||
|
||||
|
@ -24,8 +24,6 @@
|
||||
|
||||
# include <Databases/MySQL/DatabaseConnectionMySQL.h> // for fetchTablesColumnsList
|
||||
|
||||
# include <mysqlxx/Pool.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -38,8 +36,7 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_TABLE;
|
||||
}
|
||||
|
||||
|
||||
StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
const auto & args_func = ast_function->as<ASTFunction &>();
|
||||
|
||||
@ -55,14 +52,12 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
|
||||
for (auto & arg : args)
|
||||
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
|
||||
|
||||
std::string host_port = args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
std::string remote_database_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
std::string remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
std::string user_name = args[3]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
std::string password = args[4]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
String host_port = args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
remote_database_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
user_name = args[3]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
password = args[4]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
|
||||
bool replace_query = false;
|
||||
std::string on_duplicate_clause;
|
||||
if (args.size() >= 6)
|
||||
replace_query = args[5]->as<ASTLiteral &>().value.safeGet<UInt64>() > 0;
|
||||
if (args.size() == 7)
|
||||
@ -74,27 +69,46 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
/// 3306 is the default MySQL port number
|
||||
auto parsed_host_port = parseAddress(host_port, 3306);
|
||||
parsed_host_port = parseAddress(host_port, 3306);
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionMySQL::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
assert(!parsed_host_port.first.empty());
|
||||
if (!pool)
|
||||
pool.emplace(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
|
||||
|
||||
mysqlxx::Pool pool(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
|
||||
const auto & settings = context.getSettingsRef();
|
||||
const auto tables_and_columns = fetchTablesColumnsList(pool, remote_database_name, {remote_table_name}, settings.external_table_functions_use_nulls, settings.mysql_datatypes_support_level);
|
||||
const auto tables_and_columns = fetchTablesColumnsList(*pool, remote_database_name, {remote_table_name}, settings.external_table_functions_use_nulls, settings.mysql_datatypes_support_level);
|
||||
|
||||
const auto columns = tables_and_columns.find(remote_table_name);
|
||||
if (columns == tables_and_columns.end())
|
||||
throw Exception("MySQL table " + backQuoteIfNeed(remote_database_name) + "." + backQuoteIfNeed(remote_table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
|
||||
|
||||
return ColumnsDescription{columns->second};
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
assert(!parsed_host_port.first.empty());
|
||||
if (!pool)
|
||||
pool.emplace(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
|
||||
|
||||
auto columns = getActualTableStructure(context);
|
||||
|
||||
auto res = StorageMySQL::create(
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
std::move(pool),
|
||||
std::move(*pool),
|
||||
remote_database_name,
|
||||
remote_table_name,
|
||||
replace_query,
|
||||
on_duplicate_clause,
|
||||
ColumnsDescription{columns->second},
|
||||
columns,
|
||||
ConstraintsDescription{},
|
||||
context);
|
||||
|
||||
pool.reset();
|
||||
|
||||
res->startup();
|
||||
return res;
|
||||
}
|
||||
|
@ -1,6 +1,11 @@
|
||||
#pragma once
|
||||
#if !defined(ARCADIA_BUILD)
|
||||
# include "config_core.h"
|
||||
#endif
|
||||
|
||||
#if USE_MYSQL
|
||||
#include <TableFunctions/ITableFunction.h>
|
||||
#include <mysqlxx/Pool.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -19,8 +24,23 @@ public:
|
||||
return name;
|
||||
}
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
const char * getStorageTypeName() const override { return "MySQL"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
std::pair<std::string, UInt16> parsed_host_port;
|
||||
String remote_database_name;
|
||||
String remote_table_name;
|
||||
String user_name;
|
||||
String password;
|
||||
bool replace_query = false;
|
||||
String on_duplicate_clause;
|
||||
|
||||
mutable std::optional<mysqlxx::Pool> pool;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -17,23 +17,30 @@ namespace ErrorCodes
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionNull::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
void TableFunctionNull::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
if (const auto * function = ast_function->as<ASTFunction>())
|
||||
{
|
||||
auto arguments = function->arguments->children;
|
||||
const auto * function = ast_function->as<ASTFunction>();
|
||||
if (!function || !function->arguments)
|
||||
throw Exception("Table function '" + getName() + "' requires 'structure'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
if (arguments.size() != 1)
|
||||
throw Exception("Table function '" + getName() + "' requires 'structure'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
const auto & arguments = function->arguments->children;
|
||||
if (arguments.size() != 1)
|
||||
throw Exception("Table function '" + getName() + "' requires 'structure'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
auto structure = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[0], context)->as<ASTLiteral>()->value.safeGet<String>();
|
||||
ColumnsDescription columns = parseColumnsListFromString(structure, context);
|
||||
structure = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[0], context)->as<ASTLiteral>()->value.safeGet<String>();
|
||||
}
|
||||
|
||||
auto res = StorageNull::create(StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription());
|
||||
res->startup();
|
||||
return res;
|
||||
}
|
||||
throw Exception("Table function '" + getName() + "' requires 'structure'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
ColumnsDescription TableFunctionNull::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionNull::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
auto columns = getActualTableStructure(context);
|
||||
auto res = StorageNull::create(StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription());
|
||||
res->startup();
|
||||
return res;
|
||||
}
|
||||
|
||||
void registerTableFunctionNull(TableFunctionFactory & factory)
|
||||
|
@ -17,8 +17,13 @@ public:
|
||||
static constexpr auto name = "null";
|
||||
std::string getName() const override { return name; }
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const String & table_name, ColumnsDescription cached_columns) const override;
|
||||
const char * getStorageTypeName() const override { return "Null"; }
|
||||
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
|
||||
String structure;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Storages/System/StorageSystemNumbers.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include "registerTableFunctions.h"
|
||||
|
||||
|
||||
@ -18,8 +19,16 @@ namespace ErrorCodes
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
|
||||
template <bool multithreaded>
|
||||
StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
ColumnsDescription TableFunctionNumbers<multithreaded>::getActualTableStructure(const Context & /*context*/) const
|
||||
{
|
||||
/// NOTE: https://bugs.llvm.org/show_bug.cgi?id=47418
|
||||
return ColumnsDescription{{{"number", std::make_shared<DataTypeUInt64>()}}};
|
||||
}
|
||||
|
||||
template <bool multithreaded>
|
||||
StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
if (const auto * function = ast_function->as<ASTFunction>())
|
||||
{
|
||||
@ -28,7 +37,6 @@ StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_f
|
||||
if (arguments.size() != 1 && arguments.size() != 2)
|
||||
throw Exception("Table function '" + getName() + "' requires 'length' or 'offset, length'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
|
||||
UInt64 offset = arguments.size() == 2 ? evaluateArgument(context, arguments[0]) : 0;
|
||||
UInt64 length = arguments.size() == 2 ? evaluateArgument(context, arguments[1]) : evaluateArgument(context, arguments[0]);
|
||||
|
||||
|
@ -17,11 +17,14 @@ class TableFunctionNumbers : public ITableFunction
|
||||
public:
|
||||
static constexpr auto name = multithreaded ? "numbers_mt" : "numbers";
|
||||
std::string getName() const override { return name; }
|
||||
bool hasStaticStructure() const override { return true; }
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
const char * getStorageTypeName() const override { return "SystemNumbers"; }
|
||||
|
||||
UInt64 evaluateArgument(const Context & context, ASTPtr & argument) const;
|
||||
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
};
|
||||
|
||||
|
||||
|
@ -27,7 +27,8 @@ namespace ErrorCodes
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
|
||||
void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
ASTs & args_func = ast_function->children;
|
||||
|
||||
@ -44,7 +45,6 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
|
||||
String cluster_description;
|
||||
String remote_database;
|
||||
String remote_table;
|
||||
ASTPtr remote_table_function_ptr;
|
||||
String username;
|
||||
String password;
|
||||
|
||||
@ -136,7 +136,6 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
|
||||
for (auto ast : args)
|
||||
setIdentifierSpecial(ast);
|
||||
|
||||
ClusterPtr cluster;
|
||||
if (!cluster_name.empty())
|
||||
{
|
||||
/// Use an existing cluster from the main config
|
||||
@ -189,30 +188,54 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
|
||||
if (!remote_table_function_ptr && remote_table.empty())
|
||||
throw Exception("The name of remote table cannot be empty", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
auto remote_table_id = StorageID::createEmpty();
|
||||
remote_table_id.database_name = remote_database;
|
||||
remote_table_id.table_name = remote_table;
|
||||
auto structure_remote_table = getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const
|
||||
{
|
||||
/// StorageDistributed supports mismatching structure of remote table, so we can use outdated structure for CREATE ... AS remote(...)
|
||||
/// without additional conversion in StorageTableFunctionProxy
|
||||
if (cached_columns.empty())
|
||||
cached_columns = getActualTableStructure(context);
|
||||
|
||||
assert(cluster);
|
||||
StoragePtr res = remote_table_function_ptr
|
||||
? StorageDistributed::createWithOwnCluster(
|
||||
? StorageDistributed::create(
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
structure_remote_table,
|
||||
cached_columns,
|
||||
ConstraintsDescription{},
|
||||
remote_table_function_ptr,
|
||||
cluster,
|
||||
context)
|
||||
: StorageDistributed::createWithOwnCluster(
|
||||
String{},
|
||||
context,
|
||||
ASTPtr{},
|
||||
String{},
|
||||
String{},
|
||||
false,
|
||||
cluster)
|
||||
: StorageDistributed::create(
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
structure_remote_table,
|
||||
remote_database,
|
||||
remote_table,
|
||||
cluster,
|
||||
context);
|
||||
cached_columns,
|
||||
ConstraintsDescription{},
|
||||
remote_table_id.database_name,
|
||||
remote_table_id.table_name,
|
||||
String{},
|
||||
context,
|
||||
ASTPtr{},
|
||||
String{},
|
||||
String{},
|
||||
false,
|
||||
cluster);
|
||||
|
||||
res->startup();
|
||||
return res;
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionRemote::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
assert(cluster);
|
||||
return getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr);
|
||||
}
|
||||
|
||||
TableFunctionRemote::TableFunctionRemote(const std::string & name_, bool secure_)
|
||||
: name{name_}, secure{secure_}
|
||||
|
@ -1,6 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <TableFunctions/ITableFunction.h>
|
||||
#include <Interpreters/Cluster.h>
|
||||
#include <Interpreters/StorageID.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -20,14 +22,24 @@ public:
|
||||
|
||||
std::string getName() const override { return name; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
|
||||
bool needStructureConversion() const override { return false; }
|
||||
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
const char * getStorageTypeName() const override { return "Distributed"; }
|
||||
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
std::string name;
|
||||
bool is_cluster_function;
|
||||
std::string help_message;
|
||||
bool secure;
|
||||
|
||||
ClusterPtr cluster;
|
||||
StorageID remote_table_id = StorageID::createEmpty();
|
||||
ASTPtr remote_table_function_ptr;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ namespace ErrorCodes
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
void TableFunctionS3::parseArguments(const ASTPtr & ast_function, const Context & context)
|
||||
{
|
||||
/// Parse args
|
||||
ASTs & args_func = ast_function->children;
|
||||
@ -38,11 +38,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Conte
|
||||
for (auto & arg : args)
|
||||
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
|
||||
|
||||
String filename = args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
String format;
|
||||
String structure;
|
||||
String access_key_id;
|
||||
String secret_access_key;
|
||||
filename = args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
|
||||
if (args.size() < 5)
|
||||
{
|
||||
@ -57,47 +53,38 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Conte
|
||||
structure = args[4]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
}
|
||||
|
||||
String compression_method;
|
||||
if (args.size() == 4 || args.size() == 6)
|
||||
compression_method = args.back()->as<ASTLiteral &>().value.safeGet<String>();
|
||||
else
|
||||
compression_method = "auto";
|
||||
}
|
||||
|
||||
ColumnsDescription columns = parseColumnsListFromString(structure, context);
|
||||
ColumnsDescription TableFunctionS3::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
||||
/// Create table
|
||||
StoragePtr storage = getStorage(filename, access_key_id, secret_access_key, format, columns, const_cast<Context &>(context), table_name, compression_method);
|
||||
StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
Poco::URI uri (filename);
|
||||
S3::URI s3_uri (uri);
|
||||
UInt64 min_upload_part_size = context.getSettingsRef().s3_min_upload_part_size;
|
||||
|
||||
StoragePtr storage = StorageS3::create(
|
||||
s3_uri,
|
||||
access_key_id,
|
||||
secret_access_key,
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
format,
|
||||
min_upload_part_size,
|
||||
getActualTableStructure(context),
|
||||
ConstraintsDescription{},
|
||||
const_cast<Context &>(context),
|
||||
compression_method);
|
||||
|
||||
storage->startup();
|
||||
|
||||
return storage;
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionS3::getStorage(
|
||||
const String & source,
|
||||
const String & access_key_id,
|
||||
const String & secret_access_key,
|
||||
const String & format,
|
||||
const ColumnsDescription & columns,
|
||||
Context & global_context,
|
||||
const std::string & table_name,
|
||||
const String & compression_method)
|
||||
{
|
||||
Poco::URI uri (source);
|
||||
S3::URI s3_uri (uri);
|
||||
UInt64 min_upload_part_size = global_context.getSettingsRef().s3_min_upload_part_size;
|
||||
return StorageS3::create(
|
||||
s3_uri,
|
||||
access_key_id,
|
||||
secret_access_key,
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
format,
|
||||
min_upload_part_size,
|
||||
columns,
|
||||
ConstraintsDescription{},
|
||||
global_context,
|
||||
compression_method);
|
||||
}
|
||||
|
||||
void registerTableFunctionS3(TableFunctionFactory & factory)
|
||||
{
|
||||
|
@ -22,24 +22,26 @@ public:
|
||||
{
|
||||
return name;
|
||||
}
|
||||
bool hasStaticStructure() const override { return true; }
|
||||
|
||||
protected:
|
||||
StoragePtr executeImpl(
|
||||
const ASTPtr & ast_function,
|
||||
const Context & context,
|
||||
const std::string & table_name) const override;
|
||||
|
||||
static StoragePtr getStorage(
|
||||
const String & source,
|
||||
const String & access_key_id,
|
||||
const String & secret_access_key,
|
||||
const String & format,
|
||||
const ColumnsDescription & columns,
|
||||
Context & global_context,
|
||||
const std::string & table_name,
|
||||
const String & compression_method);
|
||||
ColumnsDescription cached_columns) const override;
|
||||
|
||||
const char * getStorageTypeName() const override { return "S3"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
String filename;
|
||||
String format;
|
||||
String structure;
|
||||
String access_key_id;
|
||||
String secret_access_key;
|
||||
String compression_method = "auto";
|
||||
};
|
||||
|
||||
class TableFunctionCOS : public TableFunctionS3
|
||||
|
@ -10,10 +10,12 @@
|
||||
namespace DB
|
||||
{
|
||||
StoragePtr TableFunctionURL::getStorage(
|
||||
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const
|
||||
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
|
||||
const std::string & table_name, const String & compression_method_) const
|
||||
{
|
||||
Poco::URI uri(source);
|
||||
return StorageURL::create(uri, StorageID(getDatabaseName(), table_name), format, columns, ConstraintsDescription{}, global_context, compression_method);
|
||||
return StorageURL::create(uri, StorageID(getDatabaseName(), table_name), format_, columns, ConstraintsDescription{},
|
||||
global_context, compression_method_);
|
||||
}
|
||||
|
||||
void registerTableFunctionURL(TableFunctionFactory & factory)
|
||||
|
@ -21,7 +21,8 @@ public:
|
||||
|
||||
private:
|
||||
StoragePtr getStorage(
|
||||
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const override;
|
||||
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
|
||||
const std::string & table_name, const String & compression_method_) const override;
|
||||
const char * getStorageTypeName() const override { return "URL"; }
|
||||
};
|
||||
|
||||
|
@ -62,8 +62,10 @@ static void parseAndInsertValues(MutableColumns & res_columns, const ASTs & args
|
||||
}
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionValues::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
void TableFunctionValues::parseArguments(const ASTPtr & ast_function, const Context & /*context*/)
|
||||
{
|
||||
|
||||
|
||||
ASTs & args_func = ast_function->children;
|
||||
|
||||
if (args_func.size() != 1)
|
||||
@ -83,9 +85,18 @@ StoragePtr TableFunctionValues::executeImpl(const ASTPtr & ast_function, const C
|
||||
"Got '{}' instead", getName(), args[0]->formatForErrorMessage()),
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
std::string structure = args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
|
||||
ColumnsDescription columns = parseColumnsListFromString(structure, context);
|
||||
structure = args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionValues::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionValues::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
auto columns = getActualTableStructure(context);
|
||||
|
||||
Block sample_block;
|
||||
for (const auto & name_type : columns.getOrdinary())
|
||||
@ -93,6 +104,8 @@ StoragePtr TableFunctionValues::executeImpl(const ASTPtr & ast_function, const C
|
||||
|
||||
MutableColumns res_columns = sample_block.cloneEmptyColumns();
|
||||
|
||||
ASTs & args = ast_function->children.at(0)->children;
|
||||
|
||||
/// Parsing other arguments as values and inserting them into columns
|
||||
parseAndInsertValues(res_columns, args, sample_block, context);
|
||||
|
||||
|
@ -12,9 +12,15 @@ class TableFunctionValues : public ITableFunction
|
||||
public:
|
||||
static constexpr auto name = "values";
|
||||
std::string getName() const override { return name; }
|
||||
bool hasStaticStructure() const override { return true; }
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
const char * getStorageTypeName() const override { return "Values"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
|
||||
String structure;
|
||||
};
|
||||
|
||||
|
||||
|
@ -1,5 +1,4 @@
|
||||
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <Storages/StorageView.h>
|
||||
@ -16,24 +15,37 @@ namespace ErrorCodes
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionView::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
void TableFunctionView::parseArguments(const ASTPtr & ast_function, const Context & /*context*/)
|
||||
{
|
||||
if (const auto * function = ast_function->as<ASTFunction>())
|
||||
const auto * function = ast_function->as<ASTFunction>();
|
||||
if (function)
|
||||
{
|
||||
if (auto * select = function->tryGetQueryArgument())
|
||||
{
|
||||
auto sample = InterpreterSelectWithUnionQuery::getSampleBlock(function->arguments->children[0] /* ASTPtr */, context);
|
||||
auto columns = ColumnsDescription(sample.getNamesAndTypesList());
|
||||
ASTCreateQuery create;
|
||||
create.select = select;
|
||||
auto res = StorageView::create(StorageID(getDatabaseName(), table_name), create, columns);
|
||||
res->startup();
|
||||
return res;
|
||||
create.set(create.select, select->clone());
|
||||
return;
|
||||
}
|
||||
}
|
||||
throw Exception("Table function '" + getName() + "' requires a query argument.", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionView::getActualTableStructure(const Context & context) const
|
||||
{
|
||||
assert(create.select);
|
||||
assert(create.children.size() == 1);
|
||||
assert(create.children[0]->as<ASTSelectWithUnionQuery>());
|
||||
auto sample = InterpreterSelectWithUnionQuery::getSampleBlock(create.children[0], context);
|
||||
return ColumnsDescription(sample.getNamesAndTypesList());
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionView::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
auto columns = getActualTableStructure(context);
|
||||
auto res = StorageView::create(StorageID(getDatabaseName(), table_name), create, columns);
|
||||
res->startup();
|
||||
return res;
|
||||
}
|
||||
|
||||
void registerTableFunctionView(TableFunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction<TableFunctionView>();
|
||||
|
@ -1,9 +1,9 @@
|
||||
#pragma once
|
||||
|
||||
#include <TableFunctions/ITableFunction.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <common/types.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -17,10 +17,13 @@ public:
|
||||
static constexpr auto name = "view";
|
||||
std::string getName() const override { return name; }
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const String & table_name, ColumnsDescription cached_columns) const override;
|
||||
const char * getStorageTypeName() const override { return "View"; }
|
||||
|
||||
UInt64 evaluateArgument(const Context & context, ASTPtr & argument) const;
|
||||
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
|
||||
ASTCreateQuery create;
|
||||
};
|
||||
|
||||
|
||||
|
@ -3,8 +3,8 @@
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Storages/System/StorageSystemZeros.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include "registerTableFunctions.h"
|
||||
@ -18,8 +18,16 @@ namespace ErrorCodes
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
|
||||
template <bool multithreaded>
|
||||
StoragePtr TableFunctionZeros<multithreaded>::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
ColumnsDescription TableFunctionZeros<multithreaded>::getActualTableStructure(const Context & /*context*/) const
|
||||
{
|
||||
/// NOTE: https://bugs.llvm.org/show_bug.cgi?id=47418
|
||||
return ColumnsDescription{{{"zero", std::make_shared<DataTypeUInt8>()}}};
|
||||
}
|
||||
|
||||
template <bool multithreaded>
|
||||
StoragePtr TableFunctionZeros<multithreaded>::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
if (const auto * function = ast_function->as<ASTFunction>())
|
||||
{
|
||||
|
@ -17,11 +17,14 @@ class TableFunctionZeros : public ITableFunction
|
||||
public:
|
||||
static constexpr auto name = multithreaded ? "zeros_mt" : "zeros";
|
||||
std::string getName() const override { return name; }
|
||||
bool hasStaticStructure() const override { return true; }
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
const char * getStorageTypeName() const override { return "SystemZeros"; }
|
||||
|
||||
UInt64 evaluateArgument(const Context & context, ASTPtr & argument) const;
|
||||
|
||||
ColumnsDescription getActualTableStructure(const Context & context) const override;
|
||||
};
|
||||
|
||||
|
||||
|
2
tests/config/lsan_suppressions.txt
Normal file
2
tests/config/lsan_suppressions.txt
Normal file
@ -0,0 +1,2 @@
|
||||
# See https://bugs.llvm.org/show_bug.cgi?id=47418
|
||||
# leak:getActualTableStructure
|
@ -34,7 +34,7 @@ def test_single_file(started_cluster, cluster):
|
||||
|
||||
assert out == '1\ta\n2\tbb\n3\tccc\n'
|
||||
|
||||
query = "create table t (dummy UInt32) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_1/shard1_replica1/1.bin');" \
|
||||
query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_1/shard1_replica1/1.bin');" \
|
||||
"select * from t"
|
||||
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
|
||||
|
||||
@ -57,7 +57,7 @@ def test_two_files(started_cluster, cluster):
|
||||
|
||||
assert out == '0\t_\n1\ta\n2\tbb\n3\tccc\n'
|
||||
|
||||
query = "create table t (dummy UInt32) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_2/shard1_replica1/{1,2,3,4}.bin');" \
|
||||
query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_2/shard1_replica1/{1,2,3,4}.bin');" \
|
||||
"select * from t order by x"
|
||||
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
|
||||
|
||||
@ -77,7 +77,7 @@ def test_single_file_old(started_cluster, cluster):
|
||||
|
||||
assert out == '1\ta\n2\tbb\n3\tccc\n'
|
||||
|
||||
query = "create table t (dummy UInt32) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_3/default@not_existing:9000/1.bin');" \
|
||||
query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_3/default@not_existing:9000/1.bin');" \
|
||||
"select * from t"
|
||||
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
|
||||
|
||||
|
@ -1,11 +1,11 @@
|
||||
CREATE TABLE default.file\n(\n `n` Int8\n)\nENGINE = File(\'TSVWithNamesAndTypes\')
|
||||
CREATE TABLE default.buffer\n(\n `n` Int8\n)\nENGINE = Buffer(\'default\', \'file\', 16, 10, 200, 10000, 1000000, 10000000, 1000000000)
|
||||
CREATE TABLE default.merge\n(\n `n` Int8\n)\nENGINE = Merge(\'default\', \'distributed\')
|
||||
CREATE TABLE default.merge_tf AS merge(\'default\', \'.*\')
|
||||
CREATE TABLE default.merge_tf\n(\n `n` Int8\n) AS merge(\'default\', \'.*\')
|
||||
CREATE TABLE default.distributed\n(\n `n` Int8\n)\nENGINE = Distributed(\'test_shard_localhost\', \'default\', \'file\')
|
||||
CREATE TABLE default.distributed_tf AS cluster(\'test_shard_localhost\', \'default\', \'buffer\')
|
||||
CREATE TABLE default.distributed_tf\n(\n `n` Int8\n) AS cluster(\'test_shard_localhost\', \'default\', \'buffer\')
|
||||
CREATE TABLE default.url\n(\n `n` UInt64,\n `col` String\n)\nENGINE = URL(\'https://localhost:8443/?query=select+n,+_table+from+default.merge+format+CSV\', \'CSV\')
|
||||
CREATE TABLE default.rich_syntax AS remote(\'localhos{x|y|t}\', cluster(\'test_shard_localhost\', remote(\'127.0.0.{1..4}\', \'default\', \'view\')))
|
||||
CREATE TABLE default.rich_syntax\n(\n `n` Int64\n) AS remote(\'localhos{x|y|t}\', cluster(\'test_shard_localhost\', remote(\'127.0.0.{1..4}\', \'default\', \'view\')))
|
||||
CREATE VIEW default.view\n(\n `n` Int64\n) AS\nSELECT toInt64(n) AS n\nFROM \n(\n SELECT toString(n) AS n\n FROM default.merge\n WHERE _table != \'qwerty\'\n ORDER BY _table ASC\n)\nUNION ALL\nSELECT *\nFROM default.file
|
||||
CREATE DICTIONARY default.dict\n(\n `n` UInt64,\n `col` String DEFAULT \'42\'\n)\nPRIMARY KEY n\nSOURCE(CLICKHOUSE(HOST \'localhost\' PORT 9440 SECURE 1 USER \'default\' TABLE \'url\'))\nLIFETIME(MIN 0 MAX 1)\nLAYOUT(CACHE(SIZE_IN_CELLS 1))
|
||||
16
|
||||
|
@ -0,0 +1,10 @@
|
||||
CREATE TABLE test_01457.tf_remote\n(\n `n` Int8\n) AS remote(\'localhost\', \'default\', \'tmp\')
|
||||
CREATE TABLE test_01457.tf_remote_explicit_structure\n(\n `n` UInt64\n) AS remote(\'localhost\', \'default\', \'tmp\')
|
||||
CREATE TABLE test_01457.tf_numbers\n(\n `number` String\n) AS numbers(1)
|
||||
CREATE TABLE test_01457.tf_merge\n(\n `n` Int8\n) AS merge(\'default\', \'tmp\')
|
||||
42
|
||||
0 Int8
|
||||
0 Int8
|
||||
0 UInt64
|
||||
0 String
|
||||
0 Int8
|
@ -0,0 +1,33 @@
|
||||
DROP DATABASE IF EXISTS test_01457;
|
||||
|
||||
CREATE DATABASE test_01457;
|
||||
|
||||
CREATE TABLE tmp (n Int8) ENGINE=Memory;
|
||||
|
||||
CREATE TABLE test_01457.tf_remote AS remote('localhost', currentDatabase(), 'tmp');
|
||||
SHOW CREATE TABLE test_01457.tf_remote;
|
||||
CREATE TABLE test_01457.tf_remote_explicit_structure (n UInt64) AS remote('localhost', currentDatabase(), 'tmp');
|
||||
SHOW CREATE TABLE test_01457.tf_remote_explicit_structure;
|
||||
CREATE TABLE test_01457.tf_numbers (number String) AS numbers(1);
|
||||
SHOW CREATE TABLE test_01457.tf_numbers;
|
||||
CREATE TABLE test_01457.tf_merge AS merge(currentDatabase(), 'tmp');
|
||||
SHOW CREATE TABLE test_01457.tf_merge;
|
||||
|
||||
DROP TABLE tmp;
|
||||
|
||||
DETACH DATABASE test_01457;
|
||||
ATTACH DATABASE test_01457;
|
||||
|
||||
CREATE TABLE tmp (n Int8) ENGINE=Memory;
|
||||
INSERT INTO test_01457.tf_remote_explicit_structure VALUES ('42');
|
||||
SELECT * FROM tmp;
|
||||
TRUNCATE TABLE tmp;
|
||||
INSERT INTO test_01457.tf_remote VALUES (0);
|
||||
|
||||
SELECT (*,).1 AS c, toTypeName(c) FROM tmp;
|
||||
SELECT (*,).1 AS c, toTypeName(c) FROM test_01457.tf_remote;
|
||||
SELECT (*,).1 AS c, toTypeName(c) FROM test_01457.tf_remote_explicit_structure;
|
||||
SELECT (*,).1 AS c, toTypeName(c) FROM test_01457.tf_numbers;
|
||||
SELECT (*,).1 AS c, toTypeName(c) FROM test_01457.tf_merge;
|
||||
|
||||
DROP DATABASE test_01457;
|
@ -1,7 +1,6 @@
|
||||
CREATE TABLE default.table_from_remote AS remote(\'localhost\', \'system\', \'numbers\')
|
||||
CREATE TABLE default.table_from_remote AS remote(\'localhost\', \'system\', \'numbers\')
|
||||
CREATE TABLE default.table_from_numbers AS numbers(1000)
|
||||
CREATE TABLE default.table_from_numbers AS numbers(1000)
|
||||
CREATE TABLE default.table_from_remote\n(\n `number` UInt64\n) AS remote(\'localhost\', \'system\', \'numbers\')
|
||||
CREATE TABLE default.table_from_remote\n(\n `number` UInt64,\n `col` UInt8\n) AS remote(\'localhost\', \'system\', \'numbers\')
|
||||
CREATE TABLE default.table_from_numbers\n(\n `number` UInt64\n) AS numbers(1000)
|
||||
CREATE TABLE default.table_from_numbers\n(\n `number` UInt64\n) AS numbers(1000)
|
||||
CREATE TABLE default.table_from_select\n(\n `number` UInt64\n)\nENGINE = MergeTree()\nORDER BY tuple()\nSETTINGS index_granularity = 8192
|
||||
CREATE TABLE default.table_from_select\n(\n `number` UInt64,\n `col` UInt8\n)\nENGINE = MergeTree()\nORDER BY tuple()\nSETTINGS index_granularity = 8192
|
||||
1
|
||||
|
@ -6,7 +6,7 @@ CREATE TABLE table_from_remote AS remote('localhost', 'system', 'numbers');
|
||||
|
||||
SHOW CREATE TABLE table_from_remote;
|
||||
|
||||
ALTER TABLE table_from_remote ADD COLUMN col UInt8; --{serverError 48}
|
||||
ALTER TABLE table_from_remote ADD COLUMN col UInt8;
|
||||
|
||||
SHOW CREATE TABLE table_from_remote;
|
||||
|
||||
@ -26,8 +26,6 @@ ALTER TABLE table_from_select ADD COLUMN col UInt8;
|
||||
|
||||
SHOW CREATE TABLE table_from_select;
|
||||
|
||||
SELECT 1;
|
||||
|
||||
DROP TABLE IF EXISTS table_from_remote;
|
||||
DROP TABLE IF EXISTS table_from_select;
|
||||
DROP TABLE IF EXISTS table_from_numbers;
|
||||
|
@ -141,6 +141,7 @@
|
||||
01460_DistributedFilesToInsert
|
||||
01474_executable_dictionary
|
||||
01474_bad_global_join
|
||||
01457_create_as_table_function_structure
|
||||
01473_event_time_microseconds
|
||||
01461_query_start_time_microseconds
|
||||
01455_shard_leaf_max_rows_bytes_to_read
|
||||
|
Loading…
Reference in New Issue
Block a user