diff --git a/dbms/src/AggregateFunctions/AggregateFunctionCount.h b/dbms/src/AggregateFunctions/AggregateFunctionCount.h index c903b03e675..5d39addff8b 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionCount.h +++ b/dbms/src/AggregateFunctions/AggregateFunctionCount.h @@ -42,9 +42,6 @@ public: void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override { - if (!place || !rhs) - throw Exception("nullptr", ErrorCodes::LOGICAL_ERROR); - data(place).count += data(rhs).count; } diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 4a253fc1895..9a2482530bb 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -370,6 +370,7 @@ namespace ErrorCodes extern const int QUERY_IS_PROHIBITED = 392; extern const int THERE_IS_NO_QUERY = 393; extern const int QUERY_WAS_CANCELLED = 394; + extern const int PTHREAD_ERROR = 395; extern const int KEEPER_EXCEPTION = 999; diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index c1a7d520acb..5731663b5c7 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -175,7 +175,7 @@ namespace CurrentMemoryTracker } } -DB::ActionBlockerSingleThread::BlockHolder getCurrentMemoryTrackerBlocker() +DB::ActionBlockerSingleThread::LockHolder getCurrentMemoryTrackerBlocker() { - return (DB::current_thread) ? DB::current_thread->memory_tracker.blocker.cancel() : DB::ActionBlockerSingleThread::BlockHolder(nullptr); + return (DB::current_thread) ? DB::current_thread->memory_tracker.blocker.cancel() : DB::ActionBlockerSingleThread::LockHolder(nullptr); } diff --git a/dbms/src/Common/MemoryTracker.h b/dbms/src/Common/MemoryTracker.h index 573e829b6a3..3e32ce5c9e5 100644 --- a/dbms/src/Common/MemoryTracker.h +++ b/dbms/src/Common/MemoryTracker.h @@ -123,4 +123,4 @@ namespace CurrentMemoryTracker } -DB::ActionBlockerSingleThread::BlockHolder getCurrentMemoryTrackerBlocker(); +DB::ActionBlockerSingleThread::LockHolder getCurrentMemoryTrackerBlocker(); diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index 9c1882c38de..a2737a3925f 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -180,6 +180,13 @@ void Counters::reset() resetCounters(); } +void Counters::getPartiallyAtomicSnapshot(Counters & res) const +{ + for (Event i = 0; i < num_counters; ++i) + res.counters[i].store(counters[i], std::memory_order_relaxed); +} + + const char * getDescription(Event event) { static const char * descriptions[] = diff --git a/dbms/src/Common/ProfileEvents.h b/dbms/src/Common/ProfileEvents.h index 302e5be1884..174c2945435 100644 --- a/dbms/src/Common/ProfileEvents.h +++ b/dbms/src/Common/ProfileEvents.h @@ -29,7 +29,7 @@ namespace ProfileEvents { Counter * counters = nullptr; Counters * parent = nullptr; - const Level level = Level::Thread; + Level level = Level::Thread; std::unique_ptr counters_holder; Counters() = default; @@ -55,6 +55,8 @@ namespace ProfileEvents } while (current != nullptr); } + void getPartiallyAtomicSnapshot(Counters & res) const; + /// Reset metrics and parent void reset(); diff --git a/dbms/src/Common/ThreadStatus.cpp b/dbms/src/Common/ThreadStatus.cpp index 52e0a29f3c9..b52099845b0 100644 --- a/dbms/src/Common/ThreadStatus.cpp +++ b/dbms/src/Common/ThreadStatus.cpp @@ -5,6 +5,7 @@ #include #include +#include #include @@ -26,6 +27,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int PTHREAD_ERROR; } @@ -105,7 +107,7 @@ struct RusageCounters }; -struct ThreadStatus::Payload +struct ThreadStatus::Impl { RusageCounters last_rusage; }; @@ -118,13 +120,40 @@ struct ThreadStatus::Payload // LOG_DEBUG(thread_status->log, "Thread " << thread_status->poco_thread_number << " is finished"); //} +static pthread_once_t once_query_at_exit_callback = PTHREAD_ONCE_INIT; +static pthread_key_t tid_key_at_exit; + +static void thread_destructor(void * data) +{ + auto thread_status = static_cast(data); + thread_status->onExit(); + LOG_DEBUG(thread_status->log, "Destruct thread " << thread_status->poco_thread_number); + thread_status->thread_exited = true; +} + +static void thread_create_at_exit_key() { + if (0 != pthread_key_create(&tid_key_at_exit, thread_destructor)) + throw Exception("Failed pthread_key_create", ErrorCodes::PTHREAD_ERROR); +} + ThreadStatus::ThreadStatus() : poco_thread_number(Poco::ThreadNumber::get()), performance_counters(ProfileEvents::Level::Thread), log(&Poco::Logger::get("ThreadStatus")) { + impl = std::make_shared(); + LOG_DEBUG(log, "Thread " << poco_thread_number << " created"); + + if (0 != pthread_once(&once_query_at_exit_callback, thread_create_at_exit_key)) + throw Exception("Failed pthread_once", ErrorCodes::PTHREAD_ERROR); + + if (nullptr != pthread_getspecific(tid_key_at_exit)) + throw Exception("pthread_getspecific is already set", ErrorCodes::LOGICAL_ERROR); + + if (0 != pthread_setspecific(tid_key_at_exit, static_cast(this))) + throw Exception("Failed pthread_setspecific", ErrorCodes::PTHREAD_ERROR); } ThreadStatus::~ThreadStatus() @@ -134,52 +163,59 @@ ThreadStatus::~ThreadStatus() void ThreadStatus::init(QueryStatus * parent_query_, ProfileEvents::Counters * parent_counters, MemoryTracker * parent_memory_tracker) { - if (!initialized) + if (initialized) { if (auto counters_parent = performance_counters.parent) if (counters_parent != parent_counters) - LOG_WARNING(current_thread->log, "Parent performance counters are already set, overwrite"); + LOG_WARNING(log, "Parent performance counters are already set, overwrite"); if (auto tracker_parent = memory_tracker.getParent()) if (tracker_parent != parent_memory_tracker) - LOG_WARNING(current_thread->log, "Parent memory tracker is already set, overwrite"); + LOG_WARNING(log, "Parent memory tracker is already set, overwrite"); return; } - initialized = true; parent_query = parent_query_; performance_counters.parent = parent_counters; memory_tracker.setParent(parent_memory_tracker); memory_tracker.setDescription("(for thread)"); + initialized = true; + + /// Attach current thread to list of query threads + if (parent_query) + { + std::lock_guard lock(parent_query->threads_mutex); + auto res = parent_query->thread_statuses.emplace(current_thread->poco_thread_number, current_thread); + + if (!res.second && res.first->second.get() != current_thread.get()) + throw Exception("Thread " + std::to_string(current_thread->poco_thread_number) + " is set twice", ErrorCodes::LOGICAL_ERROR); + } onStart(); } void ThreadStatus::onStart() { - payload = std::make_shared(); - /// First init of thread rusage counters, set real time to zero, other metrics remain as is - payload->last_rusage.setFromCurrent(); - RusageCounters::incrementProfileEvents(payload->last_rusage, RusageCounters::zeros(payload->last_rusage.real_time)); + impl->last_rusage.setFromCurrent(); + RusageCounters::incrementProfileEvents(impl->last_rusage, RusageCounters::zeros(impl->last_rusage.real_time)); } void ThreadStatus::onExit() { - if (!initialized || !payload) - return; - - RusageCounters::updateProfileEvents(payload->last_rusage); + RusageCounters::updateProfileEvents(impl->last_rusage); } void ThreadStatus::reset() { + std::lock_guard lock(mutex); + + initialized = false; parent_query = nullptr; performance_counters.reset(); memory_tracker.reset(); memory_tracker.setParent(nullptr); - initialized = false; } @@ -190,19 +226,11 @@ void ThreadStatus::setCurrentThreadParentQuery(QueryStatus * parent_process) if (!parent_process) { - current_thread->init(parent_process, nullptr, nullptr); + current_thread->init(nullptr, nullptr, nullptr); return; } current_thread->init(parent_process, &parent_process->performance_counters, &parent_process->memory_tracker); - - { - std::lock_guard lock(parent_process->threads_mutex); - auto res = parent_process->thread_statuses.emplace(current_thread->poco_thread_number, current_thread); - - if (!res.second && res.first->second.get() != current_thread.get()) - throw Exception("Thread " + std::to_string(current_thread->poco_thread_number) + " is set twice", ErrorCodes::LOGICAL_ERROR); - } } void ThreadStatus::setCurrentThreadFromSibling(const ThreadStatusPtr & sibling_thread) @@ -213,11 +241,39 @@ void ThreadStatus::setCurrentThreadFromSibling(const ThreadStatusPtr & sibling_t if (sibling_thread == nullptr) throw Exception("Sibling thread was not initialized", ErrorCodes::LOGICAL_ERROR); + std::lock_guard lock(sibling_thread->mutex); current_thread->init(sibling_thread->parent_query, sibling_thread->performance_counters.parent, sibling_thread->memory_tracker.getParent()); } +struct ScopeCurrentThread +{ + ScopeCurrentThread() + { + if (!current_thread) + std::terminate(); // current_thread must be initialized + } + + ~ScopeCurrentThread() + { + if (!current_thread) + std::terminate(); // current_thread must be initialized + + if (Poco::ThreadNumber::get() != current_thread->poco_thread_number) + std::terminate(); // unexpected thread number + + current_thread->onExit(); + LOG_DEBUG(current_thread->log, "Thread " << current_thread->poco_thread_number << " is exiting"); + current_thread->thread_exited = true; + } +}; + thread_local ThreadStatusPtr current_thread = ThreadStatus::create(); +/// Order of current_thread and current_thread_scope matters +static thread_local ScopeCurrentThread current_thread_scope; + + + } diff --git a/dbms/src/Common/ThreadStatus.h b/dbms/src/Common/ThreadStatus.h index bce75b70779..ee0d2072b9c 100644 --- a/dbms/src/Common/ThreadStatus.h +++ b/dbms/src/Common/ThreadStatus.h @@ -3,6 +3,8 @@ #include #include #include +#include + namespace Poco { @@ -15,16 +17,18 @@ namespace DB struct QueryStatus; struct ThreadStatus; +struct ScopeCurrentThread; using ThreadStatusPtr = std::shared_ptr; -struct ThreadStatus : public ext::shared_ptr_helper, public std::enable_shared_from_this +struct ThreadStatus : public ext::shared_ptr_helper { UInt32 poco_thread_number = 0; QueryStatus * parent_query = nullptr; - ProfileEvents::Counters performance_counters; MemoryTracker memory_tracker; + bool thread_exited = false; + std::mutex mutex; void init(QueryStatus * parent_query_, ProfileEvents::Counters * parent_counters, MemoryTracker * parent_memory_tracker); void onStart(); @@ -38,14 +42,16 @@ struct ThreadStatus : public ext::shared_ptr_helper, public std::e ~ThreadStatus(); -protected: + friend struct ScopeCurrentThread; + +//protected: ThreadStatus(); bool initialized = false; Poco::Logger * log; - struct Payload; - std::shared_ptr payload; + struct Impl; + std::shared_ptr impl; }; diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 48d7cab01af..a8e3f14cc18 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -1275,10 +1275,9 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl( bool final, ThreadPool * thread_pool) const { - auto converter = [&](size_t bucket, const ThreadStatusPtr & main_thread) + auto converter = [&](size_t bucket, ThreadStatusPtr main_thread) { - if (main_thread) - ThreadStatus::setCurrentThreadFromSibling(main_thread); + ThreadStatus::setCurrentThreadFromSibling(main_thread); return convertOneBucketToBlock(data_variants, method, final, bucket); }; @@ -1555,7 +1554,7 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl( template -void NO_INLINE Aggregator:: mergeBucketImpl( +void NO_INLINE Aggregator::mergeBucketImpl( ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const { /// We connect all aggregation results to the first. @@ -1722,14 +1721,13 @@ private: if (max_scheduled_bucket_num >= NUM_BUCKETS) return; - parallel_merge_data->pool.schedule([this, main_thread=current_thread] () { thread(max_scheduled_bucket_num, main_thread); }); + parallel_merge_data->pool.schedule(std::bind(&MergingAndConvertingBlockInputStream::thread, this, + max_scheduled_bucket_num, current_thread)); } - void thread(Int32 bucket_num, const ThreadStatusPtr & main_thread) + void thread(Int32 bucket_num, ThreadStatusPtr main_thread) { - if (main_thread) - ThreadStatus::setCurrentThreadFromSibling(main_thread); - + ThreadStatus::setCurrentThreadFromSibling(main_thread); setThreadName("MergingAggregtd"); CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread}; @@ -2034,10 +2032,9 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV LOG_TRACE(log, "Merging partially aggregated two-level data."); - auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, const ThreadStatusPtr & main_thread) + auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, ThreadStatusPtr main_thread) { - if (main_thread) - ThreadStatus::setCurrentThreadFromSibling(main_thread); + ThreadStatus::setCurrentThreadFromSibling(main_thread); for (Block & block : bucket_to_blocks[bucket]) { diff --git a/dbms/src/Interpreters/ProcessList.cpp b/dbms/src/Interpreters/ProcessList.cpp index faf90489aeb..9a71ceda434 100644 --- a/dbms/src/Interpreters/ProcessList.cpp +++ b/dbms/src/Interpreters/ProcessList.cpp @@ -133,24 +133,17 @@ ProcessListEntry::~ProcessListEntry() /// Finalize all threads statuses { + current_thread->onExit(); + std::lock_guard lock(it->threads_mutex); for (auto & elem : it->thread_statuses) { auto & thread_status = elem.second; - thread_status->onExit(); thread_status->reset(); - thread_status.reset(); - } - - it->thread_statuses.clear(); + }; } - /// Also reset query master thread status - /// NOTE: we can't destroy it, since master threads are selected from fixed thread pool - if (current_thread) - current_thread->reset(); - std::lock_guard lock(parent.mutex); /// The order of removing memory_trackers is important. diff --git a/dbms/src/Interpreters/ProcessList.h b/dbms/src/Interpreters/ProcessList.h index c598c9c2b5d..1b2f8c63552 100644 --- a/dbms/src/Interpreters/ProcessList.h +++ b/dbms/src/Interpreters/ProcessList.h @@ -59,6 +59,10 @@ struct QueryStatusInfo size_t written_bytes; Int64 memory_usage; ClientInfo client_info; + + /// Optional fields, filled by request + std::vector thread_numbers; + std::unique_ptr profile_counters; }; @@ -81,7 +85,8 @@ struct QueryStatus MemoryTracker memory_tracker; mutable std::shared_mutex threads_mutex; - using QueryThreadStatuses = std::map; /// Key is Poco's thread_id + /// Key is Poco's thread_id + using QueryThreadStatuses = std::map; QueryThreadStatuses thread_statuses; CurrentMetrics::Increment num_queries_increment{CurrentMetrics::Query}; @@ -264,16 +269,35 @@ public: size_t size() const { return processes.size(); } /// Get current state of process list. - Info getInfo() const + Info getInfo(bool get_thread_list = false, bool get_profile_events = false) const { + Info per_query_infos; + std::lock_guard lock(mutex); - Info res; - res.reserve(processes.size()); - for (const auto & elem : processes) - res.emplace_back(elem.getInfo()); + per_query_infos.reserve(processes.size()); + for (const auto & process : processes) + { + per_query_infos.emplace_back(process.getInfo()); + QueryStatusInfo & current_info = per_query_infos.back(); - return res; + if (get_thread_list) + { + std::lock_guard lock(process.threads_mutex); + current_info.thread_numbers.reserve(process.thread_statuses.size()); + + for (auto & thread_status_elem : process.thread_statuses) + current_info.thread_numbers.emplace_back(thread_status_elem.second->poco_thread_number); + } + + if (get_profile_events) + { + current_info.profile_counters = std::make_unique(ProfileEvents::Level::Process); + process.performance_counters.getPartiallyAtomicSnapshot(*current_info.profile_counters); + } + } + + return per_query_infos; } void setMaxSize(size_t max_size_) diff --git a/dbms/src/Interpreters/SystemLog.h b/dbms/src/Interpreters/SystemLog.h index faba63bc2ad..6d970a47f71 100644 --- a/dbms/src/Interpreters/SystemLog.h +++ b/dbms/src/Interpreters/SystemLog.h @@ -56,7 +56,7 @@ class QueryLog; class PartLog; -/// System logs should be destroyed in destructor of last Context and before tables, +/// System logs should be destroyed in destructor of the last Context and before tables, /// because SystemLog destruction makes insert query while flushing data into underlying tables struct SystemLogs { diff --git a/dbms/src/Storages/System/StorageSystemProcesses.cpp b/dbms/src/Storages/System/StorageSystemProcesses.cpp index af02417af4f..4596ec583cb 100644 --- a/dbms/src/Storages/System/StorageSystemProcesses.cpp +++ b/dbms/src/Storages/System/StorageSystemProcesses.cpp @@ -1,10 +1,13 @@ #include #include #include +#include +#include #include #include #include #include +#include namespace DB @@ -50,6 +53,18 @@ StorageSystemProcesses::StorageSystemProcesses(const std::string & name_) { "memory_usage", std::make_shared() }, { "query", std::make_shared() } }; + + virtual_columns = ColumnsWithTypeAndName{ + { + std::make_shared(std::make_shared()), + "thread_numbers" + }, + { + std::make_shared(std::make_shared( + DataTypes{std::make_shared(), std::make_shared()})), + "profile_counters" + } + }; } @@ -61,12 +76,18 @@ BlockInputStreams StorageSystemProcesses::read( const size_t /*max_block_size*/, const unsigned /*num_streams*/) { - check(column_names); processed_stage = QueryProcessingStage::FetchColumns; - ProcessList::Info info = context.getProcessList().getInfo(); + auto virtual_columns_processor = getVirtualColumnsProcessor(); + bool has_thread_numbers, has_profile_counters; + Names real_columns = virtual_columns_processor.process(column_names, {&has_thread_numbers, &has_profile_counters}); + check(real_columns); - MutableColumns res_columns = getSampleBlock().cloneEmptyColumns(); + Block res_block = getSampleBlock().cloneEmpty(); + virtual_columns_processor.appendVirtualColumns(res_block); + MutableColumns res_columns = res_block.cloneEmptyColumns(); + + ProcessList::Info info = context.getProcessList().getInfo(has_thread_numbers, has_profile_counters); for (const auto & process : info) { @@ -98,10 +119,38 @@ BlockInputStreams StorageSystemProcesses::read( res_columns[i++]->insert(UInt64(process.written_bytes)); res_columns[i++]->insert(process.memory_usage); res_columns[i++]->insert(process.query); + + if (has_thread_numbers) + { + Array thread_numbers; + thread_numbers.reserve(process.thread_numbers.size()); + + for (const UInt32 thread_number : process.thread_numbers) + thread_numbers.emplace_back(UInt64(thread_number)); + + res_columns[i++]->insert(std::move(thread_numbers)); + } + + if (has_profile_counters) + { + Array profile_counters; + profile_counters.reserve(ProfileEvents::Counters::num_counters); + + for (ProfileEvents::Event event = 0; event < ProfileEvents::Counters::num_counters; ++event) + { + Array name_and_counter{ + String(ProfileEvents::getDescription(event)), + UInt64((*process.profile_counters)[event].load(std::memory_order_relaxed)) + }; + + profile_counters.emplace_back(Tuple(std::move(name_and_counter))); + } + + res_columns[i++]->insert(std::move(profile_counters)); + } } - return BlockInputStreams(1, std::make_shared(getSampleBlock().cloneWithColumns(std::move(res_columns)))); + return BlockInputStreams(1, std::make_shared(res_block.cloneWithColumns(std::move(res_columns)))); } - } diff --git a/dbms/src/Storages/System/StorageSystemProcesses.h b/dbms/src/Storages/System/StorageSystemProcesses.h index f8f26d13d35..59e7fa91d16 100644 --- a/dbms/src/Storages/System/StorageSystemProcesses.h +++ b/dbms/src/Storages/System/StorageSystemProcesses.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB @@ -12,7 +13,7 @@ class Context; /** Implements `processes` system table, which allows you to get information about the queries that are currently executing. */ -class StorageSystemProcesses : public ext::shared_ptr_helper, public IStorage +class StorageSystemProcesses : public ext::shared_ptr_helper, public StorageWithVirtualColumns { public: std::string getName() const override { return "SystemProcesses"; } diff --git a/dbms/src/Storages/System/StorageSystemTables.cpp b/dbms/src/Storages/System/StorageSystemTables.cpp index 852392a63d1..00dcdc2f051 100644 --- a/dbms/src/Storages/System/StorageSystemTables.cpp +++ b/dbms/src/Storages/System/StorageSystemTables.cpp @@ -11,6 +11,7 @@ #include #include #include +#include namespace DB @@ -21,95 +22,6 @@ namespace ErrorCodes extern const int CANNOT_GET_CREATE_TABLE_QUERY; } -/// Some virtual columns routines -namespace -{ - -bool hasColumn(const ColumnsWithTypeAndName & columns, const String & column_name) -{ - for (const auto & column : columns) - { - if (column.name == column_name) - return true; - } - - return false; -} - - -NameAndTypePair tryGetColumn(const ColumnsWithTypeAndName & columns, const String & column_name) -{ - for (const auto & column : columns) - { - if (column.name == column_name) - return {column.name, column.type}; - } - - return {}; -} - - -struct VirtualColumnsProcessor -{ - explicit VirtualColumnsProcessor(const ColumnsWithTypeAndName & all_virtual_columns_) - : all_virtual_columns(all_virtual_columns_), virtual_columns_mask(all_virtual_columns_.size(), 0) {} - - /// Separates real and virtual column names, returns real ones - Names process(const Names & column_names, const std::vector & virtual_columns_exists_flag = {}) - { - Names real_column_names; - - if (!virtual_columns_exists_flag.empty()) - { - for (size_t i = 0; i < all_virtual_columns.size(); ++i) - *virtual_columns_exists_flag.at(i) = false; - } - - for (const String & column_name : column_names) - { - ssize_t virtual_column_index = -1; - - for (size_t i = 0; i < all_virtual_columns.size(); ++i) - { - if (column_name == all_virtual_columns[i].name) - { - virtual_column_index = i; - break; - } - } - - if (virtual_column_index >= 0) - { - auto index = static_cast(virtual_column_index); - virtual_columns_mask[index] = 1; - if (!virtual_columns_exists_flag.empty()) - *virtual_columns_exists_flag.at(index) = true; - } - else - { - real_column_names.emplace_back(column_name); - } - } - - return real_column_names; - } - - void appendVirtualColumns(Block & block) - { - for (size_t i = 0; i < all_virtual_columns.size(); ++i) - { - if (virtual_columns_mask[i]) - block.insert(all_virtual_columns[i].cloneEmpty()); - } - } - -protected: - const ColumnsWithTypeAndName & all_virtual_columns; - std::vector virtual_columns_mask; -}; - -} - StorageSystemTables::StorageSystemTables(const std::string & name_) : name(name_) @@ -160,7 +72,7 @@ BlockInputStreams StorageSystemTables::read( bool has_create_table_query = false; bool has_engine_full = false; - VirtualColumnsProcessor virtual_columns_processor(virtual_columns); + auto virtual_columns_processor = getVirtualColumnsProcessor(); real_column_names = virtual_columns_processor.process(column_names, {&has_metadata_modification_time, &has_create_table_query, &has_engine_full}); check(real_column_names); @@ -265,15 +177,4 @@ BlockInputStreams StorageSystemTables::read( return {std::make_shared(res_block)}; } -bool StorageSystemTables::hasColumn(const String & column_name) const -{ - return DB::hasColumn(virtual_columns, column_name) || ITableDeclaration::hasColumn(column_name); -} - -NameAndTypePair StorageSystemTables::getColumn(const String & column_name) const -{ - auto virtual_column = DB::tryGetColumn(virtual_columns, column_name); - return !virtual_column.name.empty() ? virtual_column : ITableDeclaration::getColumn(column_name); -} - } diff --git a/dbms/src/Storages/System/StorageSystemTables.h b/dbms/src/Storages/System/StorageSystemTables.h index fada738af5a..1e1cd4c83aa 100644 --- a/dbms/src/Storages/System/StorageSystemTables.h +++ b/dbms/src/Storages/System/StorageSystemTables.h @@ -2,7 +2,7 @@ #include #include - +#include namespace DB { @@ -12,7 +12,7 @@ class Context; /** Implements the system table `tables`, which allows you to get information about all tables. */ -class StorageSystemTables : public ext::shared_ptr_helper, public IStorage +class StorageSystemTables : public ext::shared_ptr_helper, public StorageWithVirtualColumns { public: std::string getName() const override { return "SystemTables"; } @@ -26,15 +26,9 @@ public: size_t max_block_size, unsigned num_streams) override; - bool hasColumn(const String & column_name) const override; - - NameAndTypePair getColumn(const String & column_name) const override; - private: const std::string name; - ColumnsWithTypeAndName virtual_columns; - protected: StorageSystemTables(const std::string & name_); }; diff --git a/dbms/src/Storages/System/VirtualColumnsProcessor.cpp b/dbms/src/Storages/System/VirtualColumnsProcessor.cpp new file mode 100644 index 00000000000..a126cc3f03a --- /dev/null +++ b/dbms/src/Storages/System/VirtualColumnsProcessor.cpp @@ -0,0 +1,95 @@ +#include "VirtualColumnsProcessor.h" + +namespace DB +{ + +bool hasColumn(const ColumnsWithTypeAndName & columns, const String & column_name) +{ + for (const auto & column : columns) + { + if (column.name == column_name) + return true; + } + + return false; +} + + +NameAndTypePair tryGetColumn(const ColumnsWithTypeAndName & columns, const String & column_name) +{ + for (const auto & column : columns) + { + if (column.name == column_name) + return {column.name, column.type}; + } + + return {}; +} + + +bool StorageWithVirtualColumns::hasColumnImpl(const ITableDeclaration * table, const String & column_name) const +{ + return DB::hasColumn(virtual_columns, column_name) || table->ITableDeclaration::hasColumn(column_name); +} + +NameAndTypePair StorageWithVirtualColumns::getColumnImpl(const ITableDeclaration * table, const String & column_name) const +{ + auto virtual_column = DB::tryGetColumn(virtual_columns, column_name); + return !virtual_column.name.empty() ? virtual_column : table->ITableDeclaration::getColumn(column_name); +} + +VirtualColumnsProcessor StorageWithVirtualColumns::getVirtualColumnsProcessor() +{ + return VirtualColumnsProcessor(virtual_columns); +} + + +Names VirtualColumnsProcessor::process(const Names & column_names, const std::vector & virtual_columns_exists_flag) +{ + Names real_column_names; + + if (!virtual_columns_exists_flag.empty()) + { + for (size_t i = 0; i < all_virtual_columns.size(); ++i) + *virtual_columns_exists_flag.at(i) = false; + } + + for (const String & column_name : column_names) + { + ssize_t virtual_column_index = -1; + + for (size_t i = 0; i < all_virtual_columns.size(); ++i) + { + if (column_name == all_virtual_columns[i].name) + { + virtual_column_index = i; + break; + } + } + + if (virtual_column_index >= 0) + { + auto index = static_cast(virtual_column_index); + virtual_columns_mask[index] = 1; + if (!virtual_columns_exists_flag.empty()) + *virtual_columns_exists_flag.at(index) = true; + } + else + { + real_column_names.emplace_back(column_name); + } + } + + return real_column_names; +} + +void VirtualColumnsProcessor::appendVirtualColumns(Block & block) +{ + for (size_t i = 0; i < all_virtual_columns.size(); ++i) + { + if (virtual_columns_mask[i]) + block.insert(all_virtual_columns[i].cloneEmpty()); + } +} + +} diff --git a/dbms/src/Storages/System/VirtualColumnsProcessor.h b/dbms/src/Storages/System/VirtualColumnsProcessor.h new file mode 100644 index 00000000000..af379bd223b --- /dev/null +++ b/dbms/src/Storages/System/VirtualColumnsProcessor.h @@ -0,0 +1,62 @@ +#pragma once + +#include +#include +#include +#include + + +/// Some virtual columns routines +namespace DB +{ + +struct VirtualColumnsProcessor; + + +/// Adaptor for storages having virtual columns +class StorageWithVirtualColumns : public IStorage +{ +public: + + bool hasColumn(const String & column_name) const override + { + return hasColumnImpl(this, column_name); + } + + NameAndTypePair getColumn(const String & column_name) const override + { + return getColumnImpl(this, column_name); + } + +protected: + + VirtualColumnsProcessor getVirtualColumnsProcessor(); + + ColumnsWithTypeAndName virtual_columns; + +protected: + + /// Use these methods instead of regular hasColumn / getColumn + bool hasColumnImpl(const ITableDeclaration * table, const String & column_name) const; + + NameAndTypePair getColumnImpl(const ITableDeclaration * table, const String & column_name) const; +}; + + +struct VirtualColumnsProcessor +{ + explicit VirtualColumnsProcessor(const ColumnsWithTypeAndName & all_virtual_columns_) + : all_virtual_columns(all_virtual_columns_), virtual_columns_mask(all_virtual_columns_.size(), 0) {} + + /// Separates real and virtual column names, returns real ones + Names process(const Names & column_names, const std::vector & virtual_columns_exists_flag = {}); + + /// Append spevified virtual columns (with empty data) to the result block + void appendVirtualColumns(Block & block); + +protected: + const ColumnsWithTypeAndName & all_virtual_columns; + std::vector virtual_columns_mask; +}; + +} diff --git a/dbms/tests/queries/0_stateless/00385_storage_file_and_clickhouse-local_app.sh b/dbms/tests/queries/0_stateless/00385_storage_file_and_clickhouse-local_app.sh index d20094b1e61..858016efd91 100755 --- a/dbms/tests/queries/0_stateless/00385_storage_file_and_clickhouse-local_app.sh +++ b/dbms/tests/queries/0_stateless/00385_storage_file_and_clickhouse-local_app.sh @@ -25,7 +25,7 @@ function pack_unpack_compare() ${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.buf" ${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.buf_file" - rm -f "${CLICKHOUSE_TMP}/$buf_file" stderr + rm -f "$buf_file" stderr echo $((res_orig - res_db_file)) $((res_orig - res_ch_local1)) $((res_orig - res_ch_local2)) }