Add schema inference to more table engines

This commit is contained in:
Nikolay Degterinsky 2023-05-19 00:44:27 +00:00
parent ed97e46d41
commit b8be714830
19 changed files with 207 additions and 55 deletions

View File

@ -176,7 +176,7 @@ StoragePtr DatabasePostgreSQL::tryGetTable(const String & table_name, ContextPtr
}
StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr, bool table_checked) const
StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr context_, bool table_checked) const
{
if (!cache_tables || !cached_tables.contains(table_name))
{
@ -191,7 +191,8 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr,
auto storage = std::make_shared<StoragePostgreSQL>(
StorageID(database_name, table_name), pool, table_name,
ColumnsDescription{columns_info->columns}, ConstraintsDescription{}, String{}, configuration.schema, configuration.on_conflict);
ColumnsDescription{columns_info->columns}, ConstraintsDescription{}, String{},
context_, configuration.schema, configuration.on_conflict);
if (cache_tables)
{

View File

@ -18,6 +18,7 @@
#include <Common/logger_useful.h>
#include <Common/parseAddress.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h>
namespace DB
{
@ -37,12 +38,27 @@ StorageMeiliSearch::StorageMeiliSearch(
: IStorage(table_id), config{config_}, log(&Poco::Logger::get("StorageMeiliSearch (" + table_id.table_name + ")"))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
if (columns_.empty())
{
auto columns = getTableStructureFromData(config);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
}
ColumnsDescription StorageMeiliSearch::getTableStructureFromData(const MeiliSearchConfiguration & config_)
{
MeiliSearchColumnDescriptionFetcher fetcher(config_);
fetcher.addParam(doubleQuoteString("limit"), "1");
return fetcher.fetchColumnsDescription();
}
String convertASTtoStr(ASTPtr ptr)
{
WriteBufferFromOwnString out;
@ -175,6 +191,7 @@ void registerStorageMeiliSearch(StorageFactory & factory)
return std::make_shared<StorageMeiliSearch>(args.table_id, config, args.columns, args.constraints, args.comment);
},
{
.supports_schema_inference = true,
.source_access_type = AccessType::MEILISEARCH,
});
}

View File

@ -28,7 +28,9 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) override;
MeiliSearchConfiguration static getConfiguration(ASTs engine_args, ContextPtr context);
static MeiliSearchConfiguration getConfiguration(ASTs engine_args, ContextPtr context);
static ColumnsDescription getTableStructureFromData(const MeiliSearchConfiguration & config_);
private:
MeiliSearchConfiguration config;

View File

@ -170,7 +170,7 @@ void registerStorageExternalDistributed(StorageFactory & factory)
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
shards.insert(std::make_shared<StoragePostgreSQL>(
args.table_id, std::move(pool), configuration.table, args.columns, args.constraints, String{}));
args.table_id, std::move(pool), configuration.table, args.columns, args.constraints, String{}, context));
}
}
#endif

View File

@ -21,6 +21,7 @@
#include <Common/parseRemoteDescription.h>
#include <Common/logger_useful.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Databases/MySQL/FetchTablesColumnsList.h>
namespace DB
@ -65,12 +66,36 @@ StorageMySQL::StorageMySQL(
, log(&Poco::Logger::get("StorageMySQL (" + table_id_.table_name + ")"))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
if (columns_.empty())
{
auto columns = getTableStructureFromData(*pool, remote_database_name, remote_table_name, context_);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
}
ColumnsDescription StorageMySQL::getTableStructureFromData(
mysqlxx::PoolWithFailover & pool_,
const String & database,
const String & table,
const ContextPtr & context_)
{
const auto & settings = context_->getSettingsRef();
const auto tables_and_columns = fetchTablesColumnsList(pool_, database, {table}, settings, settings.mysql_datatypes_support_level);
const auto columns = tables_and_columns.find(table);
if (columns == tables_and_columns.end())
throw Exception(ErrorCodes::UNKNOWN_TABLE, "MySQL table {} doesn't exist.",
(database.empty() ? "" : (backQuote(database) + "." + backQuote(table))));
return columns->second;
}
Pipe StorageMySQL::read(
const Names & column_names_,
@ -354,6 +379,7 @@ void registerStorageMySQL(StorageFactory & factory)
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::MYSQL,
});
}

View File

@ -75,6 +75,12 @@ public:
const NamedCollection & named_collection, MySQLSettings & storage_settings,
ContextPtr context_, bool require_table = true);
static ColumnsDescription getTableStructureFromData(
mysqlxx::PoolWithFailover & pool_,
const String & database,
const String & table,
const ContextPtr & context_);
private:
friend class StorageMySQLSink;

View File

@ -43,6 +43,8 @@
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
namespace DB
{
@ -60,6 +62,7 @@ StoragePostgreSQL::StoragePostgreSQL(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
const String & remote_table_schema_,
const String & on_conflict_)
: IStorage(table_id_)
@ -70,12 +73,36 @@ StoragePostgreSQL::StoragePostgreSQL(
, log(&Poco::Logger::get("StoragePostgreSQL (" + table_id_.table_name + ")"))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
if (columns_.empty())
{
auto columns = getTableStructureFromData(pool, remote_table_name, remote_table_schema, context_);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
}
ColumnsDescription StoragePostgreSQL::getTableStructureFromData(
const postgres::PoolWithFailoverPtr & pool_,
const String & table,
const String & schema,
const ContextPtr & context_)
{
const bool use_nulls = context_->getSettingsRef().external_table_functions_use_nulls;
auto connection_holder = pool_->get();
auto columns_info = fetchPostgreSQLTableStructure(
connection_holder->get(), table, schema, use_nulls).physical_columns;
if (!columns_info)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table structure not returned");
return ColumnsDescription{columns_info->columns};
}
Pipe StoragePostgreSQL::read(
const Names & column_names_,
@ -504,10 +531,12 @@ void registerStoragePostgreSQL(StorageFactory & factory)
args.columns,
args.constraints,
args.comment,
args.getContext(),
configuration.schema,
configuration.on_conflict);
},
{
.supports_schema_inference = true,
.source_access_type = AccessType::POSTGRES,
});
}

View File

@ -31,6 +31,7 @@ public:
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
const String & remote_table_schema_ = "",
const String & on_conflict = "");
@ -66,6 +67,12 @@ public:
static Configuration processNamedCollectionResult(const NamedCollection & named_collection, bool require_table = true);
static ColumnsDescription getTableStructureFromData(
const postgres::PoolWithFailoverPtr & pool_,
const String & table,
const String & schema,
const ContextPtr & context_);
private:
String remote_table_name;
String remote_table_schema;

View File

@ -4,6 +4,7 @@
#include <Common/logger_useful.h>
#include <Processors/Sources/SQLiteSource.h>
#include <Databases/SQLite/SQLiteUtils.h>
#include <Databases/SQLite/fetchSQLiteTableStructure.h>
#include <DataTypes/DataTypeString.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IOutputFormat.h>
@ -44,12 +45,33 @@ StorageSQLite::StorageSQLite(
, log(&Poco::Logger::get("StorageSQLite (" + table_id_.table_name + ")"))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
if (columns_.empty())
{
auto columns = getTableStructureFromData(sqlite_db, remote_table_name);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
}
ColumnsDescription StorageSQLite::getTableStructureFromData(
const SQLitePtr & sqlite_db_,
const String & table)
{
auto columns = fetchSQLiteTableStructure(sqlite_db_.get(), table);
if (!columns)
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR, "Failed to fetch table structure for {}", table);
return ColumnsDescription{*columns};
}
Pipe StorageSQLite::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
@ -176,6 +198,7 @@ void registerStorageSQLite(StorageFactory & factory)
table_name, args.columns, args.constraints, args.getContext());
},
{
.supports_schema_inference = true,
.source_access_type = AccessType::SQLITE,
});
}

View File

@ -42,6 +42,10 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
static ColumnsDescription getTableStructureFromData(
const SQLitePtr & sqlite_db_,
const String & table);
private:
String remote_table_name;
String database_path;

View File

@ -1,6 +1,5 @@
#include <memory>
#include <Parsers/ASTFunction.h>
#include <Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h>
#include <Storages/MeiliSearch/StorageMeiliSearch.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionMeiliSearch.h>
@ -9,19 +8,15 @@
namespace DB
{
StoragePtr TableFunctionMeiliSearch::executeImpl(
const ASTPtr & /* ast_function */, ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/) const
const ASTPtr & /* ast_function */, ContextPtr /*context*/, const String & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
return std::make_shared<StorageMeiliSearch>(
StorageID(getDatabaseName(), table_name), configuration.value(), columns, ConstraintsDescription{}, String{});
StorageID(getDatabaseName(), table_name), configuration.value(), ColumnsDescription{}, ConstraintsDescription{}, String{});
}
ColumnsDescription TableFunctionMeiliSearch::getActualTableStructure(ContextPtr /* context */) const
{
MeiliSearchColumnDescriptionFetcher fetcher(configuration.value());
fetcher.addParam(doubleQuoteString("limit"), "1");
return fetcher.fetchColumnsDescription();
return StorageMeiliSearch::getTableStructureFromData(configuration.value());
}

View File

@ -1,7 +1,6 @@
#include "config.h"
#if USE_MYSQL
#include <Databases/MySQL/FetchTablesColumnsList.h>
#include <Processors/Sources/MySQLSource.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
@ -16,7 +15,7 @@
#include <Common/quoteString.h>
#include "registerTableFunctions.h"
#include <Databases/MySQL/DatabaseMySQL.h> // for fetchTablesColumnsList
#include <Databases/MySQL/DatabaseMySQL.h>
#include <Common/parseRemoteDescription.h>
@ -61,15 +60,7 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr
ColumnsDescription TableFunctionMySQL::getActualTableStructure(ContextPtr context) const
{
const auto & settings = context->getSettingsRef();
const auto tables_and_columns = fetchTablesColumnsList(*pool, configuration->database, {configuration->table}, settings, settings.mysql_datatypes_support_level);
const auto columns = tables_and_columns.find(configuration->table);
if (columns == tables_and_columns.end())
throw Exception(ErrorCodes::UNKNOWN_TABLE, "MySQL table {} doesn't exist.",
(configuration->database.empty() ? "" : (backQuote(configuration->database) + "." + backQuote(configuration->table))));
return columns->second;
return StorageMySQL::getTableStructureFromData(*pool, configuration->database, configuration->table, context);
}
StoragePtr TableFunctionMySQL::executeImpl(
@ -78,8 +69,6 @@ StoragePtr TableFunctionMySQL::executeImpl(
const std::string & table_name,
ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
auto res = std::make_shared<StorageMySQL>(
StorageID(getDatabaseName(), table_name),
std::move(*pool),
@ -87,7 +76,7 @@ StoragePtr TableFunctionMySQL::executeImpl(
configuration->table,
configuration->replace_query,
configuration->on_duplicate_clause,
columns,
ColumnsDescription{},
ConstraintsDescription{},
String{},
context,

View File

@ -1,8 +1,6 @@
#include <TableFunctions/TableFunctionPostgreSQL.h>
#if USE_LIBPQXX
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <TableFunctions/ITableFunction.h>
@ -24,14 +22,14 @@ namespace ErrorCodes
StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/,
ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
auto result = std::make_shared<StoragePostgreSQL>(
StorageID(getDatabaseName(), table_name),
connection_pool,
configuration->table,
columns,
ColumnsDescription{},
ConstraintsDescription{},
String{},
context,
configuration->schema,
configuration->on_conflict);
@ -42,15 +40,7 @@ StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/,
ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(ContextPtr context) const
{
const bool use_nulls = context->getSettingsRef().external_table_functions_use_nulls;
auto connection_holder = connection_pool->get();
auto columns_info = fetchPostgreSQLTableStructure(
connection_holder->get(), configuration->table, configuration->schema, use_nulls).physical_columns;
if (!columns_info)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table structure not returned");
return ColumnsDescription{columns_info->columns};
return StoragePostgreSQL::getTableStructureFromData(connection_pool, configuration->table, configuration->schema, context);
}

View File

@ -5,7 +5,6 @@
#include <Common/Exception.h>
#include <Common/quoteString.h>
#include <Databases/SQLite/fetchSQLiteTableStructure.h>
#include <Databases/SQLite/SQLiteUtils.h>
#include "registerTableFunctions.h"
@ -33,13 +32,11 @@ namespace ErrorCodes
StoragePtr TableFunctionSQLite::executeImpl(const ASTPtr & /*ast_function*/,
ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
auto storage = std::make_shared<StorageSQLite>(StorageID(getDatabaseName(), table_name),
sqlite_db,
database_path,
remote_table_name,
columns, ConstraintsDescription{}, context);
ColumnsDescription{}, ConstraintsDescription{}, context);
storage->startup();
return storage;
@ -48,12 +45,7 @@ StoragePtr TableFunctionSQLite::executeImpl(const ASTPtr & /*ast_function*/,
ColumnsDescription TableFunctionSQLite::getActualTableStructure(ContextPtr /* context */) const
{
auto columns = fetchSQLiteTableStructure(sqlite_db.get(), remote_table_name);
if (!columns)
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR, "Failed to fetch table structure for {}", remote_table_name);
return ColumnsDescription{*columns};
return StorageSQLite::getTableStructureFromData(sqlite_db, remote_table_name);
}

View File

@ -57,10 +57,12 @@ def test_simple_select(started_cluster):
push_data(client, table, data)
parameters = "'http://meili1:7700', 'new_table', ''"
node = started_cluster.instances["meili"]
node.query("DROP TABLE IF EXISTS simple_meili_table")
node.query(
"CREATE TABLE simple_meili_table(id UInt64, data String) ENGINE = MeiliSearch('http://meili1:7700', 'new_table', '')"
f"CREATE TABLE simple_meili_table(id UInt64, data String) ENGINE = MeiliSearch({parameters})"
)
assert node.query("SELECT COUNT() FROM simple_meili_table") == "100\n"
@ -73,7 +75,25 @@ def test_simple_select(started_cluster):
node.query("SELECT data FROM simple_meili_table WHERE id = 42")
== hex(42 * 42) + "\n"
)
node.query(
f"CREATE TABLE simple_meili_table_auto_schema_engine ENGINE=MeiliSearch({parameters})"
)
node.query(
f"CREATE TABLE simple_meili_table_auto_schema_function AS meilisearch({parameters})"
)
expected = "id\tInt64\t\t\t\t\t\ndata\tString\t\t\t\t\t\n"
assert (
node.query("DESCRIBE TABLE simple_meili_table_auto_schema_engine") == expected
)
assert (
node.query("DESCRIBE TABLE simple_meili_table_auto_schema_function") == expected
)
node.query("DROP TABLE simple_meili_table")
node.query("DROP TABLE simple_meili_table_auto_schema_engine")
node.query("DROP TABLE simple_meili_table_auto_schema_function")
table.delete()

View File

@ -307,6 +307,32 @@ def test_table_function(started_cluster):
conn.close()
def test_schema_inference(started_cluster):
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, "inference_table")
with conn.cursor() as cursor:
cursor.execute(
"CREATE TABLE clickhouse.inference_table (id INT PRIMARY KEY, data BINARY(16) NOT NULL)"
)
parameters = "'mysql57:3306', 'clickhouse', 'inference_table', 'root', 'clickhouse'"
node1.query(
f"CREATE TABLE mysql_schema_inference_engine ENGINE=MySQL({parameters})"
)
node1.query(f"CREATE TABLE mysql_schema_inference_function AS mysql({parameters})")
expected = "id\tInt32\t\t\t\t\t\ndata\tFixedString(16)\t\t\t\t\t\n"
assert node1.query("DESCRIBE TABLE mysql_schema_inference_engine") == expected
assert node1.query("DESCRIBE TABLE mysql_schema_inference_function") == expected
node1.query("DROP TABLE mysql_schema_inference_engine")
node1.query("DROP TABLE mysql_schema_inference_function")
drop_mysql_table(conn, "inference_table")
def test_binary_type(started_cluster):
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, "binary_type")
@ -329,6 +355,7 @@ def test_binary_type(started_cluster):
node1.query("SELECT * FROM {}".format(table_function))
== "42\tclickhouse\\0\\0\\0\\0\\0\\0\n"
)
drop_mysql_table(conn, "binary_type")
def test_enum_type(started_cluster):

View File

@ -198,7 +198,9 @@ def test_non_default_scema(started_cluster):
expected = node1.query("SELECT number FROM numbers(100)")
assert result == expected
table_function = """postgresql('postgres1:5432', 'postgres', 'test_table', 'postgres', 'mysecretpassword', 'test_schema')"""
parameters = "'postgres1:5432', 'postgres', 'test_table', 'postgres', 'mysecretpassword', 'test_schema'"
table_function = f"postgresql({parameters})"
table_engine = f"PostgreSQL({parameters})"
result = node1.query(f"SELECT * FROM {table_function}")
assert result == expected
@ -224,10 +226,19 @@ def test_non_default_scema(started_cluster):
expected = node1.query("SELECT number FROM numbers(200)")
assert result == expected
node1.query(f"CREATE TABLE test.test_pg_auto_schema_engine ENGINE={table_engine}")
node1.query(f"CREATE TABLE test.test_pg_auto_schema_function AS {table_function}")
expected = "a\tNullable(Int32)\t\t\t\t\t\n"
assert node1.query("DESCRIBE TABLE test.test_pg_auto_schema_engine") == expected
assert node1.query("DESCRIBE TABLE test.test_pg_auto_schema_function") == expected
cursor.execute("DROP SCHEMA test_schema CASCADE")
cursor.execute('DROP SCHEMA "test.nice.schema" CASCADE')
node1.query("DROP TABLE test.test_pg_table_schema")
node1.query("DROP TABLE test.test_pg_table_schema_with_dots")
node1.query("DROP TABLE test.test_pg_auto_schema_engine")
node1.query("DROP TABLE test.test_pg_auto_schema_function")
def test_concurrent_queries(started_cluster):

View File

@ -36,6 +36,11 @@ line1 1
line2 2
line3 3
line4 4
test schema inference
col1 Nullable(String)
col2 Nullable(Int32)
col1 Nullable(String)
col2 Nullable(Int32)
test path in clickhouse-local
line1 1
line2 2

View File

@ -87,6 +87,14 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO TABLE FUNCTION sqlite('${DB_PATH}', 't
${CLICKHOUSE_CLIENT} --query="SELECT * FROM sqlite('${DB_PATH}', 'table1') ORDER BY col2"
${CLICKHOUSE_CLIENT} --query="select 'test schema inference'";
${CLICKHOUSE_CLIENT} --query="CREATE TABLE sqlite_table3_inferred_engine ENGINE = SQLite('${DB_PATH}', 'table3')"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE sqlite_table3_inferred_function AS sqlite('${DB_PATH}', 'table3')"
${CLICKHOUSE_CLIENT} --query="DESCRIBE TABLE sqlite_table3_inferred_engine;"
${CLICKHOUSE_CLIENT} --query="DESCRIBE TABLE sqlite_table3_inferred_function;"
${CLICKHOUSE_CLIENT} --query="DROP TABLE sqlite_table3_inferred_engine;"
${CLICKHOUSE_CLIENT} --query="DROP TABLE sqlite_table3_inferred_function;"
sqlite3 "${DB_PATH2}" 'DROP TABLE IF EXISTS table1'
sqlite3 "${DB_PATH2}" 'CREATE TABLE table1 (col1 text, col2 smallint);'
sqlite3 "${DB_PATH2}" "INSERT INTO table1 VALUES ('line1', 1), ('line2', 2), ('line3', 3)"