Merge pull request #52956 from ClickHouse/fixes_for_databases

Fix some issues with databases
This commit is contained in:
Alexander Tokmakov 2023-08-03 19:13:41 +03:00 committed by GitHub
commit 968c79cf3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 96 additions and 93 deletions

View File

@ -1650,6 +1650,7 @@ try
database_catalog.initializeAndLoadTemporaryDatabase(); database_catalog.initializeAndLoadTemporaryDatabase();
loadMetadataSystem(global_context); loadMetadataSystem(global_context);
maybeConvertSystemDatabase(global_context); maybeConvertSystemDatabase(global_context);
startupSystemTables();
/// After attaching system databases we can initialize system log. /// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs(); global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded(); global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
@ -1668,7 +1669,6 @@ try
/// Then, load remaining databases /// Then, load remaining databases
loadMetadata(global_context, default_database); loadMetadata(global_context, default_database);
convertDatabasesEnginesIfNeed(global_context); convertDatabasesEnginesIfNeed(global_context);
startupSystemTables();
database_catalog.startupBackgroundCleanup(); database_catalog.startupBackgroundCleanup();
/// After loading validate that default database exists /// After loading validate that default database exists
database_catalog.assertDatabaseExists(default_database); database_catalog.assertDatabaseExists(default_database);

View File

@ -107,9 +107,6 @@ DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & m
{ {
cckMetadataPathForOrdinary(create, metadata_path); cckMetadataPathForOrdinary(create, metadata_path);
/// Creates store/xxx/ for Atomic
fs::create_directories(fs::path(metadata_path).parent_path());
DatabasePtr impl = getImpl(create, metadata_path, context); DatabasePtr impl = getImpl(create, metadata_path, context);
if (impl && context->hasQueryContext() && context->getSettingsRef().log_queries) if (impl && context->hasQueryContext() && context->getSettingsRef().log_queries)

View File

@ -77,6 +77,8 @@ DatabaseMySQL::DatabaseMySQL(
throw; throw;
} }
fs::create_directories(metadata_path);
thread = ThreadFromGlobalPool{&DatabaseMySQL::cleanOutdatedTables, this}; thread = ThreadFromGlobalPool{&DatabaseMySQL::cleanOutdatedTables, this};
} }

View File

@ -54,6 +54,7 @@ DatabasePostgreSQL::DatabasePostgreSQL(
, cache_tables(cache_tables_) , cache_tables(cache_tables_)
, log(&Poco::Logger::get("DatabasePostgreSQL(" + dbname_ + ")")) , log(&Poco::Logger::get("DatabasePostgreSQL(" + dbname_ + ")"))
{ {
fs::create_directories(metadata_path);
cleaner_task = getContext()->getSchedulePool().createTask("PostgreSQLCleanerTask", [this]{ removeOutdatedTables(); }); cleaner_task = getContext()->getSchedulePool().createTask("PostgreSQLCleanerTask", [this]{ removeOutdatedTables(); });
cleaner_task->deactivate(); cleaner_task->deactivate();
} }

View File

@ -250,6 +250,9 @@ static void loadSystemDatabaseImpl(ContextMutablePtr context, const String & dat
{ {
String path = context->getPath() + "metadata/" + database_name; String path = context->getPath() + "metadata/" + database_name;
String metadata_file = path + ".sql"; String metadata_file = path + ".sql";
if (fs::exists(metadata_file + ".tmp"))
fs::remove(metadata_file + ".tmp");
if (fs::exists(fs::path(metadata_file))) if (fs::exists(fs::path(metadata_file)))
{ {
/// 'has_force_restore_data_flag' is true, to not fail on loading query_log table, if it is corrupted. /// 'has_force_restore_data_flag' is true, to not fail on loading query_log table, if it is corrupted.

View File

@ -262,6 +262,9 @@ struct SelectQueryInfo
// If limit is not 0, that means it's a trivial limit query. // If limit is not 0, that means it's a trivial limit query.
UInt64 limit = 0; UInt64 limit = 0;
/// For IStorageSystemOneBlock
std::vector<UInt8> columns_mask;
InputOrderInfoPtr getInputOrderInfo() const InputOrderInfoPtr getInputOrderInfo() const
{ {
return input_order_info ? input_order_info : (projection ? projection->input_order_info : nullptr); return input_order_info ? input_order_info : (projection ? projection->input_order_info : nullptr);

View File

@ -4,6 +4,8 @@
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <Storages/ColumnsDescription.h> #include <Storages/ColumnsDescription.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Processors/Sources/SourceFromSingleChunk.h> #include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h> #include <QueryPipeline/Pipe.h>
@ -30,6 +32,8 @@ class IStorageSystemOneBlock : public IStorage
protected: protected:
virtual void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const = 0; virtual void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const = 0;
virtual bool supportsColumnsMask() const { return false; }
public: public:
explicit IStorageSystemOneBlock(const StorageID & table_id_) : IStorage(table_id_) explicit IStorageSystemOneBlock(const StorageID & table_id_) : IStorage(table_id_)
{ {
@ -48,8 +52,15 @@ public:
size_t /*num_streams*/) override size_t /*num_streams*/) override
{ {
storage_snapshot->check(column_names); storage_snapshot->check(column_names);
Block sample_block = storage_snapshot->metadata->getSampleBlockWithVirtuals(getVirtuals()); Block sample_block = storage_snapshot->metadata->getSampleBlockWithVirtuals(getVirtuals());
if (supportsColumnsMask())
{
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
query_info.columns_mask = std::move(columns_mask);
sample_block = std::move(header);
}
MutableColumns res_columns = sample_block.cloneEmptyColumns(); MutableColumns res_columns = sample_block.cloneEmptyColumns();
fillData(res_columns, context, query_info); fillData(res_columns, context, query_info);

View File

@ -10,6 +10,7 @@
#include <DataTypes/DataTypeDateTime64.h> #include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <Storages/VirtualColumnUtils.h> #include <Storages/VirtualColumnUtils.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Parsers/queryToString.h> #include <Parsers/queryToString.h>
#include <Access/ContextAccess.h> #include <Access/ContextAccess.h>
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>
@ -315,23 +316,9 @@ Pipe StorageSystemColumns::read(
const size_t /*num_streams*/) const size_t /*num_streams*/)
{ {
storage_snapshot->check(column_names); storage_snapshot->check(column_names);
/// Create a mask of what columns are needed in the result.
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = storage_snapshot->metadata->getSampleBlock(); Block sample_block = storage_snapshot->metadata->getSampleBlock();
Block header;
std::vector<UInt8> columns_mask(sample_block.columns()); auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample_block.getByPosition(i));
}
}
Block block_to_filter; Block block_to_filter;
Storages storages; Storages storages;

View File

@ -5,6 +5,7 @@
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>
#include <Storages/VirtualColumnUtils.h> #include <Storages/VirtualColumnUtils.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h> #include <Interpreters/DatabaseCatalog.h>
#include <Parsers/ASTIndexDeclaration.h> #include <Parsers/ASTIndexDeclaration.h>
@ -185,21 +186,9 @@ Pipe StorageSystemDataSkippingIndices::read(
size_t /* num_streams */) size_t /* num_streams */)
{ {
storage_snapshot->check(column_names); storage_snapshot->check(column_names);
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = storage_snapshot->metadata->getSampleBlock(); Block sample_block = storage_snapshot->metadata->getSampleBlock();
Block header;
std::vector<UInt8> columns_mask(sample_block.columns()); auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample_block.getByPosition(i));
}
}
MutableColumnPtr column = ColumnString::create(); MutableColumnPtr column = ColumnString::create();

View File

@ -117,13 +117,23 @@ void StorageSystemDatabases::fillData(MutableColumns & res_columns, ContextPtr c
const auto & database = databases.at(database_name); const auto & database = databases.at(database_name);
res_columns[0]->insert(database_name); size_t src_index = 0;
res_columns[1]->insert(database->getEngineName()); size_t res_index = 0;
res_columns[2]->insert(context->getPath() + database->getDataPath()); const auto & columns_mask = query_info.columns_mask;
res_columns[3]->insert(database->getMetadataPath()); if (columns_mask[src_index++])
res_columns[4]->insert(database->getUUID()); res_columns[res_index++]->insert(database_name);
res_columns[5]->insert(getEngineFull(context, database)); if (columns_mask[src_index++])
res_columns[6]->insert(database->getDatabaseComment()); res_columns[res_index++]->insert(database->getEngineName());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(context->getPath() + database->getDataPath());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database->getMetadataPath());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database->getUUID());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(getEngineFull(context, database));
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database->getDatabaseComment());
} }
} }

View File

@ -26,6 +26,8 @@ public:
protected: protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock; using IStorageSystemOneBlock::IStorageSystemOneBlock;
bool supportsColumnsMask() const override { return true; }
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override; void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
}; };

View File

@ -6,6 +6,7 @@
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h> #include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/System/StorageSystemPartsBase.h> #include <Storages/System/StorageSystemPartsBase.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Processors/Sources/SourceFromSingleChunk.h> #include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h> #include <QueryPipeline/Pipe.h>
#include <IO/SharedThreadPools.h> #include <IO/SharedThreadPools.h>
@ -81,13 +82,11 @@ struct WorkerState
class DetachedPartsSource : public ISource class DetachedPartsSource : public ISource
{ {
public: public:
DetachedPartsSource(Block header_, std::shared_ptr<SourceState> state_, std::vector<UInt8> columns_mask_, UInt64 block_size_, DetachedPartsSource(Block header_, std::shared_ptr<SourceState> state_, std::vector<UInt8> columns_mask_, UInt64 block_size_)
bool has_bytes_on_disk_column_)
: ISource(std::move(header_)) : ISource(std::move(header_))
, state(state_) , state(state_)
, columns_mask(std::move(columns_mask_)) , columns_mask(std::move(columns_mask_))
, block_size(block_size_) , block_size(block_size_)
, has_bytes_on_disk_column(has_bytes_on_disk_column_)
{} {}
String getName() const override { return "DataPartsSource"; } String getName() const override { return "DataPartsSource"; }
@ -127,7 +126,6 @@ private:
std::shared_ptr<SourceState> state; std::shared_ptr<SourceState> state;
const std::vector<UInt8> columns_mask; const std::vector<UInt8> columns_mask;
const UInt64 block_size; const UInt64 block_size;
const bool has_bytes_on_disk_column;
const size_t support_threads = 35; const size_t support_threads = 35;
StoragesInfo current_info; StoragesInfo current_info;
@ -149,9 +147,6 @@ private:
void calculatePartSizeOnDisk(size_t begin, std::vector<std::atomic<size_t>> & parts_sizes) void calculatePartSizeOnDisk(size_t begin, std::vector<std::atomic<size_t>> & parts_sizes)
{ {
if (!has_bytes_on_disk_column)
return;
WorkerState worker_state; WorkerState worker_state;
for (auto p_id = begin; p_id < detached_parts.size(); ++p_id) for (auto p_id = begin; p_id < detached_parts.size(); ++p_id)
@ -211,7 +206,9 @@ private:
auto begin = detached_parts.size() - rows; auto begin = detached_parts.size() - rows;
std::vector<std::atomic<size_t>> parts_sizes(rows); std::vector<std::atomic<size_t>> parts_sizes(rows);
calculatePartSizeOnDisk(begin, parts_sizes); constexpr size_t bytes_on_disk_col_idx = 4;
if (columns_mask[bytes_on_disk_col_idx])
calculatePartSizeOnDisk(begin, parts_sizes);
for (auto p_id = begin; p_id < detached_parts.size(); ++p_id) for (auto p_id = begin; p_id < detached_parts.size(); ++p_id)
{ {
@ -229,7 +226,7 @@ private:
new_columns[res_index++]->insert(p.dir_name); new_columns[res_index++]->insert(p.dir_name);
if (columns_mask[src_index++]) if (columns_mask[src_index++])
{ {
chassert(has_bytes_on_disk_column); chassert(src_index - 1 == bytes_on_disk_col_idx);
size_t bytes_on_disk = parts_sizes.at(p_id - begin).load(); size_t bytes_on_disk = parts_sizes.at(p_id - begin).load();
new_columns[res_index++]->insert(bytes_on_disk); new_columns[res_index++]->insert(bytes_on_disk);
} }
@ -285,21 +282,7 @@ Pipe StorageSystemDetachedParts::read(
storage_snapshot->check(column_names); storage_snapshot->check(column_names);
Block sample_block = storage_snapshot->metadata->getSampleBlock(); Block sample_block = storage_snapshot->metadata->getSampleBlock();
NameSet names_set(column_names.begin(), column_names.end()); auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
Block header;
std::vector<UInt8> columns_mask(sample_block.columns());
for (size_t i = 0; i < columns_mask.size(); ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample_block.getByPosition(i));
}
}
bool has_bytes_on_disk_column = names_set.contains("bytes_on_disk");
auto state = std::make_shared<SourceState>(StoragesInfoStream(query_info, context)); auto state = std::make_shared<SourceState>(StoragesInfoStream(query_info, context));
@ -307,7 +290,7 @@ Pipe StorageSystemDetachedParts::read(
for (size_t i = 0; i < num_streams; ++i) for (size_t i = 0; i < num_streams; ++i)
{ {
auto source = std::make_shared<DetachedPartsSource>(header.cloneEmpty(), state, columns_mask, max_block_size, has_bytes_on_disk_column); auto source = std::make_shared<DetachedPartsSource>(header.cloneEmpty(), state, columns_mask, max_block_size);
pipe.addSource(std::move(source)); pipe.addSource(std::move(source));
} }

View File

@ -9,6 +9,7 @@
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/StorageMaterializedMySQL.h> #include <Storages/StorageMaterializedMySQL.h>
#include <Storages/VirtualColumnUtils.h> #include <Storages/VirtualColumnUtils.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Access/ContextAccess.h> #include <Access/ContextAccess.h>
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>
#include <Parsers/queryToString.h> #include <Parsers/queryToString.h>
@ -254,21 +255,10 @@ Pipe StorageSystemPartsBase::read(
StoragesInfoStream stream(query_info, context); StoragesInfoStream stream(query_info, context);
/// Create the result. /// Create the result.
NameSet names_set(column_names.begin(), column_names.end());
Block sample = storage_snapshot->metadata->getSampleBlock(); Block sample = storage_snapshot->metadata->getSampleBlock();
Block header;
std::vector<UInt8> columns_mask(sample.columns()); auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample, column_names);
for (size_t i = 0; i < sample.columns(); ++i)
{
if (names_set.contains(sample.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample.getByPosition(i));
}
}
MutableColumns res_columns = header.cloneEmptyColumns(); MutableColumns res_columns = header.cloneEmptyColumns();
if (has_state_column) if (has_state_column)
res_columns.push_back(ColumnString::create()); res_columns.push_back(ColumnString::create());

View File

@ -3,6 +3,7 @@
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <Storages/System/StorageSystemTables.h> #include <Storages/System/StorageSystemTables.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/SelectQueryInfo.h> #include <Storages/SelectQueryInfo.h>
#include <Storages/VirtualColumnUtils.h> #include <Storages/VirtualColumnUtils.h>
@ -587,23 +588,9 @@ Pipe StorageSystemTables::read(
const size_t /*num_streams*/) const size_t /*num_streams*/)
{ {
storage_snapshot->check(column_names); storage_snapshot->check(column_names);
/// Create a mask of what columns are needed in the result.
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = storage_snapshot->metadata->getSampleBlock(); Block sample_block = storage_snapshot->metadata->getSampleBlock();
Block res_block;
std::vector<UInt8> columns_mask(sample_block.columns()); auto [columns_mask, res_block] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
res_block.insert(sample_block.getByPosition(i));
}
}
ColumnPtr filtered_databases_column = getFilteredDatabases(query_info, context); ColumnPtr filtered_databases_column = getFilteredDatabases(query_info, context);
ColumnPtr filtered_tables_column = getFilteredTables(query_info.query, filtered_databases_column, context); ColumnPtr filtered_tables_column = getFilteredTables(query_info.query, filtered_databases_column, context);

View File

@ -0,0 +1,24 @@
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
namespace DB
{
std::pair<std::vector<UInt8>, Block> getQueriedColumnsMaskAndHeader(const Block & sample_block, const Names & column_names)
{
std::vector<UInt8> columns_mask(sample_block.columns());
Block header;
NameSet names_set(column_names.begin(), column_names.end());
for (size_t i = 0; i < columns_mask.size(); ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample_block.getByPosition(i));
}
}
return std::make_pair(columns_mask, header);
}
}

View File

@ -0,0 +1,11 @@
#pragma once
#include <base/types.h>
#include <Core/Names.h>
#include <Core/Block.h>
namespace DB
{
std::pair<std::vector<UInt8>, Block> getQueriedColumnsMaskAndHeader(const Block & sample_block, const Names & column_names);
}

View File

@ -1,3 +1,4 @@
2
CREATE DATABASE test_01114_1\nENGINE = Atomic CREATE DATABASE test_01114_1\nENGINE = Atomic
CREATE DATABASE test_01114_2\nENGINE = Atomic CREATE DATABASE test_01114_2\nENGINE = Atomic
CREATE DATABASE test_01114_3\nENGINE = Ordinary CREATE DATABASE test_01114_3\nENGINE = Ordinary

View File

@ -13,6 +13,8 @@ DROP DATABASE IF EXISTS test_01114_2;
DROP DATABASE IF EXISTS test_01114_3; DROP DATABASE IF EXISTS test_01114_3;
" "
$CLICKHOUSE_CLIENT -q "CREATE DATABASE test_01114_1 ENGINE=Ordinary" 2>&1| grep -Fac "UNKNOWN_DATABASE_ENGINE"
$CLICKHOUSE_CLIENT -q "CREATE DATABASE test_01114_1 ENGINE=Atomic" $CLICKHOUSE_CLIENT -q "CREATE DATABASE test_01114_1 ENGINE=Atomic"
$CLICKHOUSE_CLIENT -q "CREATE DATABASE test_01114_2" $CLICKHOUSE_CLIENT -q "CREATE DATABASE test_01114_2"
$CLICKHOUSE_CLIENT --allow_deprecated_database_ordinary=1 -q "CREATE DATABASE test_01114_3 ENGINE=Ordinary" $CLICKHOUSE_CLIENT --allow_deprecated_database_ordinary=1 -q "CREATE DATABASE test_01114_3 ENGINE=Ordinary"