Merge branch 'master' into fix-parallel-replicas-multiply-result

This commit is contained in:
Igor Nikonov 2023-08-03 19:10:37 +02:00 committed by GitHub
commit 5ef0cd9646
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 119 additions and 105 deletions

View File

@ -1650,6 +1650,7 @@ try
database_catalog.initializeAndLoadTemporaryDatabase();
loadMetadataSystem(global_context);
maybeConvertSystemDatabase(global_context);
startupSystemTables();
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
@ -1668,7 +1669,6 @@ try
/// Then, load remaining databases
loadMetadata(global_context, default_database);
convertDatabasesEnginesIfNeed(global_context);
startupSystemTables();
database_catalog.startupBackgroundCleanup();
/// After loading validate that default database exists
database_catalog.assertDatabaseExists(default_database);

View File

@ -107,9 +107,6 @@ DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & m
{
cckMetadataPathForOrdinary(create, metadata_path);
/// Creates store/xxx/ for Atomic
fs::create_directories(fs::path(metadata_path).parent_path());
DatabasePtr impl = getImpl(create, metadata_path, context);
if (impl && context->hasQueryContext() && context->getSettingsRef().log_queries)

View File

@ -77,6 +77,8 @@ DatabaseMySQL::DatabaseMySQL(
throw;
}
fs::create_directories(metadata_path);
thread = ThreadFromGlobalPool{&DatabaseMySQL::cleanOutdatedTables, this};
}

View File

@ -54,6 +54,7 @@ DatabasePostgreSQL::DatabasePostgreSQL(
, cache_tables(cache_tables_)
, log(&Poco::Logger::get("DatabasePostgreSQL(" + dbname_ + ")"))
{
fs::create_directories(metadata_path);
cleaner_task = getContext()->getSchedulePool().createTask("PostgreSQLCleanerTask", [this]{ removeOutdatedTables(); });
cleaner_task->deactivate();
}

View File

@ -250,6 +250,9 @@ static void loadSystemDatabaseImpl(ContextMutablePtr context, const String & dat
{
String path = context->getPath() + "metadata/" + database_name;
String metadata_file = path + ".sql";
if (fs::exists(metadata_file + ".tmp"))
fs::remove(metadata_file + ".tmp");
if (fs::exists(fs::path(metadata_file)))
{
/// 'has_force_restore_data_flag' is true, to not fail on loading query_log table, if it is corrupted.

View File

@ -42,8 +42,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
off_t file_offset = 0;
off_t read_until_position = 0;
std::optional<size_t> file_size;
off_t file_size;
explicit ReadBufferFromHDFSImpl(
const std::string & hdfs_uri_,
@ -59,7 +58,6 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
, builder(createHDFSBuilder(hdfs_uri_, config_))
, read_settings(read_settings_)
, read_until_position(read_until_position_)
, file_size(file_size_)
{
fs = createHDFSFS(builder.get());
fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0);
@ -68,6 +66,22 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
throw Exception(ErrorCodes::CANNOT_OPEN_FILE,
"Unable to open HDFS file: {}. Error: {}",
hdfs_uri + hdfs_file_path, std::string(hdfsGetLastError()));
if (file_size_.has_value())
{
file_size = file_size_.value();
}
else
{
auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
if (!file_info)
{
hdfsCloseFile(fs.get(), fin);
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path);
}
file_size = static_cast<size_t>(file_info->mSize);
hdfsFreeFileInfo(file_info, 1);
}
}
~ReadBufferFromHDFSImpl() override
@ -75,16 +89,9 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
hdfsCloseFile(fs.get(), fin);
}
size_t getFileSize()
size_t getFileSize() const
{
if (file_size)
return *file_size;
auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
if (!file_info)
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path);
file_size = static_cast<size_t>(file_info->mSize);
return *file_size;
return file_size;
}
bool nextImpl() override
@ -104,6 +111,10 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
{
num_bytes_to_read = internal_buffer.size();
}
if (file_size != 0 && file_offset >= file_size)
{
return false;
}
ResourceGuard rlock(read_settings.resource_link, num_bytes_to_read);
int bytes_read;

View File

@ -262,6 +262,9 @@ struct SelectQueryInfo
// If limit is not 0, that means it's a trivial limit query.
UInt64 limit = 0;
/// For IStorageSystemOneBlock
std::vector<UInt8> columns_mask;
InputOrderInfoPtr getInputOrderInfo() const
{
return input_order_info ? input_order_info : (projection ? projection->input_order_info : nullptr);

View File

@ -4,6 +4,8 @@
#include <DataTypes/DataTypeString.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h>
@ -30,6 +32,8 @@ class IStorageSystemOneBlock : public IStorage
protected:
virtual void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const = 0;
virtual bool supportsColumnsMask() const { return false; }
public:
explicit IStorageSystemOneBlock(const StorageID & table_id_) : IStorage(table_id_)
{
@ -48,8 +52,15 @@ public:
size_t /*num_streams*/) override
{
storage_snapshot->check(column_names);
Block sample_block = storage_snapshot->metadata->getSampleBlockWithVirtuals(getVirtuals());
if (supportsColumnsMask())
{
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
query_info.columns_mask = std::move(columns_mask);
sample_block = std::move(header);
}
MutableColumns res_columns = sample_block.cloneEmptyColumns();
fillData(res_columns, context, query_info);

View File

@ -10,6 +10,7 @@
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Parsers/queryToString.h>
#include <Access/ContextAccess.h>
#include <Databases/IDatabase.h>
@ -315,23 +316,9 @@ Pipe StorageSystemColumns::read(
const size_t /*num_streams*/)
{
storage_snapshot->check(column_names);
/// Create a mask of what columns are needed in the result.
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = storage_snapshot->metadata->getSampleBlock();
Block header;
std::vector<UInt8> columns_mask(sample_block.columns());
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample_block.getByPosition(i));
}
}
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
Block block_to_filter;
Storages storages;

View File

@ -5,6 +5,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <Databases/IDatabase.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Parsers/ASTIndexDeclaration.h>
@ -185,21 +186,9 @@ Pipe StorageSystemDataSkippingIndices::read(
size_t /* num_streams */)
{
storage_snapshot->check(column_names);
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = storage_snapshot->metadata->getSampleBlock();
Block header;
std::vector<UInt8> columns_mask(sample_block.columns());
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample_block.getByPosition(i));
}
}
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
MutableColumnPtr column = ColumnString::create();

View File

@ -117,13 +117,23 @@ void StorageSystemDatabases::fillData(MutableColumns & res_columns, ContextPtr c
const auto & database = databases.at(database_name);
res_columns[0]->insert(database_name);
res_columns[1]->insert(database->getEngineName());
res_columns[2]->insert(context->getPath() + database->getDataPath());
res_columns[3]->insert(database->getMetadataPath());
res_columns[4]->insert(database->getUUID());
res_columns[5]->insert(getEngineFull(context, database));
res_columns[6]->insert(database->getDatabaseComment());
size_t src_index = 0;
size_t res_index = 0;
const auto & columns_mask = query_info.columns_mask;
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database_name);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database->getEngineName());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(context->getPath() + database->getDataPath());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database->getMetadataPath());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database->getUUID());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(getEngineFull(context, database));
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database->getDatabaseComment());
}
}

View File

@ -26,6 +26,8 @@ public:
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
bool supportsColumnsMask() const override { return true; }
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
};

View File

@ -6,6 +6,7 @@
#include <Storages/IStorage.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/System/StorageSystemPartsBase.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h>
#include <IO/SharedThreadPools.h>
@ -81,13 +82,11 @@ struct WorkerState
class DetachedPartsSource : public ISource
{
public:
DetachedPartsSource(Block header_, std::shared_ptr<SourceState> state_, std::vector<UInt8> columns_mask_, UInt64 block_size_,
bool has_bytes_on_disk_column_)
DetachedPartsSource(Block header_, std::shared_ptr<SourceState> state_, std::vector<UInt8> columns_mask_, UInt64 block_size_)
: ISource(std::move(header_))
, state(state_)
, columns_mask(std::move(columns_mask_))
, block_size(block_size_)
, has_bytes_on_disk_column(has_bytes_on_disk_column_)
{}
String getName() const override { return "DataPartsSource"; }
@ -127,7 +126,6 @@ private:
std::shared_ptr<SourceState> state;
const std::vector<UInt8> columns_mask;
const UInt64 block_size;
const bool has_bytes_on_disk_column;
const size_t support_threads = 35;
StoragesInfo current_info;
@ -149,9 +147,6 @@ private:
void calculatePartSizeOnDisk(size_t begin, std::vector<std::atomic<size_t>> & parts_sizes)
{
if (!has_bytes_on_disk_column)
return;
WorkerState worker_state;
for (auto p_id = begin; p_id < detached_parts.size(); ++p_id)
@ -211,7 +206,9 @@ private:
auto begin = detached_parts.size() - rows;
std::vector<std::atomic<size_t>> parts_sizes(rows);
calculatePartSizeOnDisk(begin, parts_sizes);
constexpr size_t bytes_on_disk_col_idx = 4;
if (columns_mask[bytes_on_disk_col_idx])
calculatePartSizeOnDisk(begin, parts_sizes);
for (auto p_id = begin; p_id < detached_parts.size(); ++p_id)
{
@ -229,7 +226,7 @@ private:
new_columns[res_index++]->insert(p.dir_name);
if (columns_mask[src_index++])
{
chassert(has_bytes_on_disk_column);
chassert(src_index - 1 == bytes_on_disk_col_idx);
size_t bytes_on_disk = parts_sizes.at(p_id - begin).load();
new_columns[res_index++]->insert(bytes_on_disk);
}
@ -285,21 +282,7 @@ Pipe StorageSystemDetachedParts::read(
storage_snapshot->check(column_names);
Block sample_block = storage_snapshot->metadata->getSampleBlock();
NameSet names_set(column_names.begin(), column_names.end());
Block header;
std::vector<UInt8> columns_mask(sample_block.columns());
for (size_t i = 0; i < columns_mask.size(); ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample_block.getByPosition(i));
}
}
bool has_bytes_on_disk_column = names_set.contains("bytes_on_disk");
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
auto state = std::make_shared<SourceState>(StoragesInfoStream(query_info, context));
@ -307,7 +290,7 @@ Pipe StorageSystemDetachedParts::read(
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<DetachedPartsSource>(header.cloneEmpty(), state, columns_mask, max_block_size, has_bytes_on_disk_column);
auto source = std::make_shared<DetachedPartsSource>(header.cloneEmpty(), state, columns_mask, max_block_size);
pipe.addSource(std::move(source));
}

View File

@ -9,6 +9,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/StorageMaterializedMySQL.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Access/ContextAccess.h>
#include <Databases/IDatabase.h>
#include <Parsers/queryToString.h>
@ -254,21 +255,10 @@ Pipe StorageSystemPartsBase::read(
StoragesInfoStream stream(query_info, context);
/// Create the result.
NameSet names_set(column_names.begin(), column_names.end());
Block sample = storage_snapshot->metadata->getSampleBlock();
Block header;
std::vector<UInt8> columns_mask(sample.columns());
for (size_t i = 0; i < sample.columns(); ++i)
{
if (names_set.contains(sample.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample.getByPosition(i));
}
}
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample, column_names);
MutableColumns res_columns = header.cloneEmptyColumns();
if (has_state_column)
res_columns.push_back(ColumnString::create());

View File

@ -3,6 +3,7 @@
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <Storages/System/StorageSystemTables.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/VirtualColumnUtils.h>
@ -587,23 +588,9 @@ Pipe StorageSystemTables::read(
const size_t /*num_streams*/)
{
storage_snapshot->check(column_names);
/// Create a mask of what columns are needed in the result.
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = storage_snapshot->metadata->getSampleBlock();
Block res_block;
std::vector<UInt8> columns_mask(sample_block.columns());
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
res_block.insert(sample_block.getByPosition(i));
}
}
auto [columns_mask, res_block] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
ColumnPtr filtered_databases_column = getFilteredDatabases(query_info, context);
ColumnPtr filtered_tables_column = getFilteredTables(query_info.query, filtered_databases_column, context);

View File

@ -0,0 +1,24 @@
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
namespace DB
{
std::pair<std::vector<UInt8>, Block> getQueriedColumnsMaskAndHeader(const Block & sample_block, const Names & column_names)
{
std::vector<UInt8> columns_mask(sample_block.columns());
Block header;
NameSet names_set(column_names.begin(), column_names.end());
for (size_t i = 0; i < columns_mask.size(); ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample_block.getByPosition(i));
}
}
return std::make_pair(columns_mask, header);
}
}

View File

@ -0,0 +1,11 @@
#pragma once
#include <base/types.h>
#include <Core/Names.h>
#include <Core/Block.h>
namespace DB
{
std::pair<std::vector<UInt8>, Block> getQueriedColumnsMaskAndHeader(const Block & sample_block, const Names & column_names);
}

View File

@ -1,3 +1,4 @@
2
CREATE DATABASE test_01114_1\nENGINE = Atomic
CREATE DATABASE test_01114_2\nENGINE = Atomic
CREATE DATABASE test_01114_3\nENGINE = Ordinary

View File

@ -13,6 +13,8 @@ DROP DATABASE IF EXISTS test_01114_2;
DROP DATABASE IF EXISTS test_01114_3;
"
$CLICKHOUSE_CLIENT -q "CREATE DATABASE test_01114_1 ENGINE=Ordinary" 2>&1| grep -Fac "UNKNOWN_DATABASE_ENGINE"
$CLICKHOUSE_CLIENT -q "CREATE DATABASE test_01114_1 ENGINE=Atomic"
$CLICKHOUSE_CLIENT -q "CREATE DATABASE test_01114_2"
$CLICKHOUSE_CLIENT --allow_deprecated_database_ordinary=1 -q "CREATE DATABASE test_01114_3 ENGINE=Ordinary"