diff --git a/CMakeLists.txt b/CMakeLists.txt index be9d829ff62..49e817afff5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -151,6 +151,8 @@ if (MAKE_STATIC_LIBRARIES AND NOT APPLE AND NOT (CMAKE_CXX_COMPILER_ID STREQUAL set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libgcc -static-libstdc++") endif () +set(THREADS_PREFER_PTHREAD_FLAG ON) + include (cmake/test_compiler.cmake) if (CMAKE_SYSTEM MATCHES "Linux" AND CMAKE_CXX_COMPILER_ID STREQUAL "Clang") @@ -158,7 +160,7 @@ if (CMAKE_SYSTEM MATCHES "Linux" AND CMAKE_CXX_COMPILER_ID STREQUAL "Clang") option (USE_LIBCXX "Use libc++ and libc++abi instead of libstdc++ (only make sense on Linux with Clang)" ${HAVE_LIBCXX}) if (USE_LIBCXX) - set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++") # Ok for clang6, for older can cause 'not used option' worning + set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++") # Ok for clang6, for older can cause 'not used option' warning set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -D_LIBCPP_DEBUG=1") # More checks in debug build. if (MAKE_STATIC_LIBRARIES) link_libraries (-Wl,-Bstatic -stdlib=libc++ c++ c++abi -Wl,-Bdynamic) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 9ebb3c7c246..c6b0c03aad1 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -64,7 +64,7 @@ if (USE_INTERNAL_CCTZ_LIBRARY) add_subdirectory (cctz-cmake) endif () -if (ENABLE_LIBTCMALLOC AND USE_INTERNAL_GPERFTOOLS_LIBRARY) +if (ENABLE_TCMALLOC AND USE_INTERNAL_GPERFTOOLS_LIBRARY) add_subdirectory (libtcmalloc) endif () diff --git a/dbms/cmake/version.cmake b/dbms/cmake/version.cmake index ad598bf0660..6b358857b8c 100644 --- a/dbms/cmake/version.cmake +++ b/dbms/cmake/version.cmake @@ -1,6 +1,6 @@ # This strings autochanged from release_lib.sh: -set(VERSION_DESCRIBE v1.1.54312-testing) -set(VERSION_REVISION 54312) +set(VERSION_DESCRIBE v1.1.54314-testing) +set(VERSION_REVISION 54314) # end of autochange set (VERSION_MAJOR 1) diff --git a/dbms/src/Core/tests/CMakeLists.txt b/dbms/src/Core/tests/CMakeLists.txt index 2901b2ce5ac..f8cfe2eee2c 100644 --- a/dbms/src/Core/tests/CMakeLists.txt +++ b/dbms/src/Core/tests/CMakeLists.txt @@ -12,6 +12,7 @@ add_executable (move_field move_field.cpp) target_link_libraries (move_field clickhouse_common_io) add_executable (rvo_test rvo_test.cpp) +target_link_libraries (rvo_test Threads::Threads) add_executable (string_ref_hash string_ref_hash.cpp) target_link_libraries (string_ref_hash clickhouse_common_io) diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp index 64a182b814e..226e8bf9409 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp @@ -62,12 +62,10 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(ColumnPlainPtrs & m } catch (...) { - desc.function->destroy(desc.state.data()); - desc.created = false; + desc.destroyState(); throw; } - desc.function->destroy(desc.state.data()); - desc.created = false; + desc.destroyState(); } else desc.merged_column->insertDefault(); @@ -123,8 +121,6 @@ Block SummingSortedBlockInputStream::readImpl() /// Additional initialization. if (current_row.empty()) { - auto & factory = AggregateFunctionFactory::instance(); - current_row.resize(num_columns); next_key.columns.resize(description.size()); @@ -178,12 +174,9 @@ Block SummingSortedBlockInputStream::readImpl() std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name)) { // Create aggregator to sum this column - auto desc = AggregateDescription{}; + AggregateDescription desc; desc.column_numbers = {i}; - desc.function = factory.get("sumWithOverflow", {column.type}); - desc.function->setArguments({column.type}); - desc.add_function = desc.function->getAddressOfAddFunction(); - desc.state.resize(desc.function->sizeOfData()); + desc.init("sumWithOverflow", {column.type}); columns_to_aggregate.emplace_back(std::move(desc)); } else @@ -218,8 +211,8 @@ Block SummingSortedBlockInputStream::readImpl() } DataTypes argument_types = {}; - auto desc = AggregateDescription{}; - auto map_desc = MapDescription{}; + AggregateDescription desc; + MapDescription map_desc; column_num_it = map.second.begin(); for (; column_num_it != map.second.end(); ++column_num_it) @@ -263,10 +256,7 @@ Block SummingSortedBlockInputStream::readImpl() if (map_desc.key_col_nums.size() == 1) { // Create summation for all value columns in the map - desc.function = factory.get("sumMap", argument_types); - desc.function->setArguments(argument_types); - desc.add_function = desc.function->getAddressOfAddFunction(); - desc.state.resize(desc.function->sizeOfData()); + desc.init("sumMap", argument_types); columns_to_aggregate.emplace_back(std::move(desc)); } else @@ -345,10 +335,7 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std: /// Reset aggregation states for next row for (auto & desc : columns_to_aggregate) - { - desc.function->create(desc.state.data()); - desc.created = true; - } + desc.createState(); // Start aggregations with current row addRow(current_row, current); diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.h b/dbms/src/DataStreams/SummingSortedBlockInputStream.h index e4207daf88d..fba61bccd3d 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.h @@ -4,6 +4,8 @@ #include #include #include +#include + namespace DB { @@ -79,12 +81,39 @@ private: std::vector state; bool created = false; + void init(const char * function_name, const DataTypes & argument_types) + { + function = AggregateFunctionFactory::instance().get(function_name, argument_types); + function->setArguments(argument_types); + add_function = function->getAddressOfAddFunction(); + state.resize(function->sizeOfData()); + } + + void createState() + { + if (created) + return; + function->create(state.data()); + created = true; + } + + void destroyState() + { + if (!created) + return; + function->destroy(state.data()); + created = false; + } + /// Explicitly destroy aggregation state if the stream is terminated ~AggregateDescription() { - if (created) - function->destroy(state.data()); + destroyState(); } + + AggregateDescription() = default; + AggregateDescription(AggregateDescription &&) = default; + AggregateDescription(const AggregateDescription &) = delete; }; /// Stores numbers of key-columns and value-columns. diff --git a/dbms/src/DataTypes/DataTypeArray.cpp b/dbms/src/DataTypes/DataTypeArray.cpp index 1b30122270e..c9ba02d1e44 100644 --- a/dbms/src/DataTypes/DataTypeArray.cpp +++ b/dbms/src/DataTypes/DataTypeArray.cpp @@ -243,9 +243,7 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( /// Check consistency between offsets and elements subcolumns. /// But if elements column is empty - it's ok for columns of Nested types that was added by ALTER. - if (nested_column.empty()) - column_array.getOffsetsColumn() = column_array.getOffsetsColumn()->cloneEmpty(); - else if (nested_column.size() != last_offset) + if (!nested_column.empty() && nested_column.size() != last_offset) throw Exception("Cannot read all array values", ErrorCodes::CANNOT_READ_ALL_DATA); } diff --git a/dbms/src/Dictionaries/Embedded/RegionsNames.h b/dbms/src/Dictionaries/Embedded/RegionsNames.h index 52b145d1718..8cad2f36f50 100644 --- a/dbms/src/Dictionaries/Embedded/RegionsNames.h +++ b/dbms/src/Dictionaries/Embedded/RegionsNames.h @@ -68,7 +68,7 @@ private: using StringRefsForLanguageID = std::vector; public: - /** Reboot, if necessary, the names of regions. + /** Reload the names of regions if necessary. */ void reload(const Poco::Util::AbstractConfiguration & config); void reload(const std::string & directory); diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index cc837c4a824..40ddaa386a2 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -25,7 +25,8 @@ #include #include #include -#include +#include +#include #include #include #include @@ -90,6 +91,8 @@ struct ContextShared { Logger * log = &Logger::get("Context"); + std::shared_ptr runtime_components_factory; + /// For access of most of shared objects. Recursive mutex. mutable Poco::Mutex mutex; /// Separate mutex for access of dictionaries. Separate mutex to avoid locks when server doing request to itself. @@ -115,7 +118,7 @@ struct ContextShared mutable std::shared_ptr external_dictionaries; mutable std::shared_ptr external_models; String default_profile_name; /// Default profile name used for default values. - Users users; /// Known users. + std::shared_ptr security_manager; /// Known users. Quotas quotas; /// Known quotas for resource use. mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks. mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files. @@ -181,7 +184,8 @@ struct ContextShared pcg64 rng{randomSeed()}; - ContextShared() + ContextShared(std::shared_ptr runtime_components_factory_) + : runtime_components_factory(std::move(runtime_components_factory_)) { /// TODO: make it singleton (?) static std::atomic num_calls{0}; @@ -191,6 +195,8 @@ struct ContextShared std::cerr.flush(); std::terminate(); } + + initialize(); } @@ -236,21 +242,32 @@ struct ContextShared databases.clear(); } } + +private: + void initialize() + { + security_manager = runtime_components_factory->createSecurityManager(); + } }; Context::Context() = default; -Context Context::createGlobal() +Context Context::createGlobal(std::shared_ptr runtime_components_factory) { Context res; - res.shared = std::make_shared(); + res.runtime_components_factory = runtime_components_factory; + res.shared = std::make_shared(runtime_components_factory); res.quota = std::make_shared(); res.system_logs = std::make_shared(); return res; } +Context Context::createGlobal() +{ + return createGlobal(std::make_unique()); +} Context::~Context() { @@ -512,7 +529,7 @@ void Context::setUsersConfig(const ConfigurationPtr & config) { auto lock = getLock(); shared->users_config = config; - shared->users.loadFromConfig(*shared->users_config); + shared->security_manager->loadFromConfig(*shared->users_config); shared->quotas.loadFromConfig(*shared->users_config); } @@ -526,7 +543,7 @@ void Context::calculateUserSettings() { auto lock = getLock(); - String profile = shared->users.get(client_info.current_user).profile; + String profile = shared->security_manager->getUser(client_info.current_user).profile; /// 1) Set default settings (hardcoded values) /// NOTE: we ignore global_context settings (from which it is usually copied) @@ -547,7 +564,7 @@ void Context::setUser(const String & name, const String & password, const Poco:: { auto lock = getLock(); - const User & user_props = shared->users.get(name, password, address.host()); + const User & user_props = shared->security_manager->authorizeAndGetUser(name, password, address.host()); client_info.current_user = name; client_info.current_address = address; @@ -582,7 +599,7 @@ void Context::checkDatabaseAccessRights(const std::string & database_name) const /// All users have access to the database system. return; } - if (!shared->users.isAllowedDatabase(client_info.current_user, database_name)) + if (!shared->security_manager->hasAccessToDatabase(client_info.current_user, database_name)) throw Exception("Access denied to database " + database_name, ErrorCodes::DATABASE_ACCESS_DENIED); } diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 6a6ce25ee17..4e4be8f48fc 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -32,6 +32,7 @@ namespace DB { struct ContextShared; +class IRuntimeComponentsFactory; class QuotaForIntervals; class EmbeddedDictionaries; class ExternalDictionaries; @@ -89,6 +90,8 @@ private: using Shared = std::shared_ptr; Shared shared; + std::shared_ptr runtime_components_factory; + ClientInfo client_info; std::shared_ptr quota; /// Current quota. By default - empty quota, that have no limits. @@ -116,6 +119,7 @@ private: public: /// Create initial Context with ContextShared and etc. + static Context createGlobal(std::shared_ptr runtime_components_factory); static Context createGlobal(); ~Context(); diff --git a/dbms/src/Interpreters/IRuntimeComponentsFactory.h b/dbms/src/Interpreters/IRuntimeComponentsFactory.h new file mode 100644 index 00000000000..9d2d611629e --- /dev/null +++ b/dbms/src/Interpreters/IRuntimeComponentsFactory.h @@ -0,0 +1,22 @@ +#pragma once + +#include + +#include + +namespace DB +{ + +/** Factory of query engine runtime components / services. + * Helps to host query engine in external applications + * by replacing or reconfiguring its components. + */ +class IRuntimeComponentsFactory +{ +public: + virtual std::unique_ptr createSecurityManager() = 0; + + virtual ~IRuntimeComponentsFactory() {} +}; + +} diff --git a/dbms/src/Interpreters/ISecurityManager.h b/dbms/src/Interpreters/ISecurityManager.h new file mode 100644 index 00000000000..005156647c0 --- /dev/null +++ b/dbms/src/Interpreters/ISecurityManager.h @@ -0,0 +1,33 @@ +#pragma once + +#include + +namespace DB +{ + +/** Duties of security manager: + * 1) Authenticate users + * 2) Provide user settings (profile, quota, ACLs) + * 3) Grant access to databases + */ +class ISecurityManager +{ +public: + virtual void loadFromConfig(Poco::Util::AbstractConfiguration & config) = 0; + + /// Find user and make authorize checks + virtual const User & authorizeAndGetUser( + const String & user_name, + const String & password, + const Poco::Net::IPAddress & address) const = 0; + + /// Just find user + virtual const User & getUser(const String & user_name) const = 0; + + /// Check if the user has access to the database. + virtual bool hasAccessToDatabase(const String & user_name, const String & database_name) const = 0; + + virtual ~ISecurityManager() {} +}; + +} diff --git a/dbms/src/Interpreters/RuntimeComponentsFactory.h b/dbms/src/Interpreters/RuntimeComponentsFactory.h new file mode 100644 index 00000000000..ce4056a5ea3 --- /dev/null +++ b/dbms/src/Interpreters/RuntimeComponentsFactory.h @@ -0,0 +1,21 @@ +#pragma once + +#include +#include + +namespace DB +{ + +/** Default implementation of runtime components factory + * used by native server application. + */ +class RuntimeComponentsFactory : public IRuntimeComponentsFactory +{ +public: + std::unique_ptr createSecurityManager() override + { + return std::make_unique(); + } +}; + +} diff --git a/dbms/src/Interpreters/SecurityManager.cpp b/dbms/src/Interpreters/SecurityManager.cpp new file mode 100644 index 00000000000..99a89d3cb09 --- /dev/null +++ b/dbms/src/Interpreters/SecurityManager.cpp @@ -0,0 +1,114 @@ +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int DNS_ERROR; + extern const int UNKNOWN_ADDRESS_PATTERN_TYPE; + extern const int UNKNOWN_USER; + extern const int REQUIRED_PASSWORD; + extern const int WRONG_PASSWORD; + extern const int IP_ADDRESS_NOT_ALLOWED; + extern const int BAD_ARGUMENTS; +} + +void SecurityManager::loadFromConfig(Poco::Util::AbstractConfiguration & config) +{ + Container new_users; + + Poco::Util::AbstractConfiguration::Keys config_keys; + config.keys("users", config_keys); + + for (const std::string & key : config_keys) + new_users.emplace(std::piecewise_construct, std::forward_as_tuple(key), std::forward_as_tuple(key, "users." + key, config)); + + users = std::move(new_users); +} + +const User & SecurityManager::authorizeAndGetUser( + const String & user_name, + const String & password, + const Poco::Net::IPAddress & address) const +{ + auto it = users.find(user_name); + + if (users.end() == it) + throw Exception("Unknown user " + user_name, ErrorCodes::UNKNOWN_USER); + + if (!it->second.addresses.contains(address)) + throw Exception("User " + user_name + " is not allowed to connect from address " + address.toString(), ErrorCodes::IP_ADDRESS_NOT_ALLOWED); + + auto on_wrong_password = [&]() + { + if (password.empty()) + throw Exception("Password required for user " + user_name, ErrorCodes::REQUIRED_PASSWORD); + else + throw Exception("Wrong password for user " + user_name, ErrorCodes::WRONG_PASSWORD); + }; + + if (!it->second.password_sha256_hex.empty()) + { + unsigned char hash[32]; + + SHA256_CTX ctx; + SHA256_Init(&ctx); + SHA256_Update(&ctx, reinterpret_cast(password.data()), password.size()); + SHA256_Final(hash, &ctx); + + String hash_hex; + { + WriteBufferFromString buf(hash_hex); + HexWriteBuffer hex_buf(buf); + hex_buf.write(reinterpret_cast(hash), sizeof(hash)); + } + + Poco::toLowerInPlace(hash_hex); + + if (hash_hex != it->second.password_sha256_hex) + on_wrong_password(); + } + else if (password != it->second.password) + { + on_wrong_password(); + } + + return it->second; +} + +const User & SecurityManager::getUser(const String & user_name) const +{ + auto it = users.find(user_name); + + if (users.end() == it) + throw Exception("Unknown user " + user_name, ErrorCodes::UNKNOWN_USER); + + return it->second; +} + +bool SecurityManager::hasAccessToDatabase(const std::string & user_name, const std::string & database_name) const +{ + auto it = users.find(user_name); + + if (users.end() == it) + throw Exception("Unknown user " + user_name, ErrorCodes::UNKNOWN_USER); + + const auto & user = it->second; + return user.databases.empty() || user.databases.count(database_name); +} + +} diff --git a/dbms/src/Interpreters/SecurityManager.h b/dbms/src/Interpreters/SecurityManager.h new file mode 100644 index 00000000000..9ce41d39b71 --- /dev/null +++ b/dbms/src/Interpreters/SecurityManager.h @@ -0,0 +1,32 @@ +#pragma once + +#include + +#include + +namespace DB +{ + +/** Default implementation of security manager used by native server application. + * Manages fixed set of users listed in 'Users' configuration file. + */ +class SecurityManager : public ISecurityManager +{ +private: + using Container = std::map; + Container users; + +public: + void loadFromConfig(Poco::Util::AbstractConfiguration & config) override; + + const User & authorizeAndGetUser( + const String & user_name, + const String & password, + const Poco::Net::IPAddress & address) const override; + + const User & getUser(const String & user_name) const override; + + bool hasAccessToDatabase(const String & user_name, const String & database_name) const override; +}; + +} diff --git a/dbms/src/Interpreters/Users.cpp b/dbms/src/Interpreters/Users.cpp index d539cececbb..ebe913eea94 100644 --- a/dbms/src/Interpreters/Users.cpp +++ b/dbms/src/Interpreters/Users.cpp @@ -306,87 +306,4 @@ User::User(const String & name_, const String & config_elem, Poco::Util::Abstrac } -void Users::loadFromConfig(Poco::Util::AbstractConfiguration & config) -{ - Container new_cont; - - Poco::Util::AbstractConfiguration::Keys config_keys; - config.keys("users", config_keys); - - for (const std::string & key : config_keys) - new_cont.emplace(std::piecewise_construct, std::forward_as_tuple(key), std::forward_as_tuple(key, "users." + key, config)); - - cont = std::move(new_cont); -} - -const User & Users::get(const String & user_name, const String & password, const Poco::Net::IPAddress & address) const -{ - auto it = cont.find(user_name); - - if (cont.end() == it) - throw Exception("Unknown user " + user_name, ErrorCodes::UNKNOWN_USER); - - if (!it->second.addresses.contains(address)) - throw Exception("User " + user_name + " is not allowed to connect from address " + address.toString(), ErrorCodes::IP_ADDRESS_NOT_ALLOWED); - - auto on_wrong_password = [&]() - { - if (password.empty()) - throw Exception("Password required for user " + user_name, ErrorCodes::REQUIRED_PASSWORD); - else - throw Exception("Wrong password for user " + user_name, ErrorCodes::WRONG_PASSWORD); - }; - - if (!it->second.password_sha256_hex.empty()) - { - unsigned char hash[32]; - - SHA256_CTX ctx; - SHA256_Init(&ctx); - SHA256_Update(&ctx, reinterpret_cast(password.data()), password.size()); - SHA256_Final(hash, &ctx); - - String hash_hex; - { - WriteBufferFromString buf(hash_hex); - HexWriteBuffer hex_buf(buf); - hex_buf.write(reinterpret_cast(hash), sizeof(hash)); - } - - Poco::toLowerInPlace(hash_hex); - - if (hash_hex != it->second.password_sha256_hex) - on_wrong_password(); - } - else if (password != it->second.password) - { - on_wrong_password(); - } - - return it->second; -} - - -const User & Users::get(const String & user_name) -{ - auto it = cont.find(user_name); - - if (cont.end() == it) - throw Exception("Unknown user " + user_name, ErrorCodes::UNKNOWN_USER); - - return it->second; -} - - -bool Users::isAllowedDatabase(const std::string & user_name, const std::string & database_name) const -{ - auto it = cont.find(user_name); - if (it == cont.end()) - throw Exception("Unknown user " + user_name, ErrorCodes::UNKNOWN_USER); - - const auto & user = it->second; - return user.databases.empty() || user.databases.count(database_name); -} - - } diff --git a/dbms/src/Interpreters/Users.h b/dbms/src/Interpreters/Users.h index 723b1f07324..96c5b3baefc 100644 --- a/dbms/src/Interpreters/Users.h +++ b/dbms/src/Interpreters/Users.h @@ -2,7 +2,6 @@ #include -#include #include #include #include @@ -38,7 +37,7 @@ public: class AddressPatterns { private: - using Container = std::vector>; + using Container = std::vector>; Container patterns; public: @@ -70,25 +69,4 @@ struct User }; -/// Known users. -class Users -{ -private: - using Container = std::map; - Container cont; - -public: - void loadFromConfig(Poco::Util::AbstractConfiguration & config); - - /// Find user and make authorize checks - const User & get(const String & user_name, const String & password, const Poco::Net::IPAddress & address) const; - - /// Just find user - const User & get(const String & user_name); - - /// Check if the user has access to the database. - bool isAllowedDatabase(const String & user_name, const String & database_name) const; -}; - - } diff --git a/dbms/src/Interpreters/tests/users.cpp b/dbms/src/Interpreters/tests/users.cpp index fa47d2cf11b..6b021052d7e 100644 --- a/dbms/src/Interpreters/tests/users.cpp +++ b/dbms/src/Interpreters/tests/users.cpp @@ -1,5 +1,5 @@ #include -#include +#include #include @@ -204,11 +204,11 @@ void runOneTest(size_t test_num, const TestDescriptor & test_descriptor) throw std::runtime_error(os.str()); } - DB::Users users; + DB::SecurityManager security_manager; try { - users.loadFromConfig(*config); + security_manager.loadFromConfig(*config); } catch (const Poco::Exception & ex) { @@ -223,7 +223,7 @@ void runOneTest(size_t test_num, const TestDescriptor & test_descriptor) try { - res = users.isAllowedDatabase(entry.user_name, entry.database_name); + res = security_manager.hasAccessToDatabase(entry.user_name, entry.database_name); } catch (const Poco::Exception &) { diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp index efd083e7c96..daa9e8c59f1 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp @@ -431,17 +431,35 @@ void MergeTreeReader::readData( } +static bool arrayHasNoElementsRead(const IColumn & column) +{ + const ColumnArray * column_array = typeid_cast(&column); + + if (!column_array) + return false; + + size_t size = column_array->size(); + if (!size) + return false; + + size_t data_size = column_array->getData().size(); + if (data_size) + return false; + + size_t last_offset = column_array->getOffsets()[size - 1]; + return last_offset != 0; +} + + void MergeTreeReader::fillMissingColumns(Block & res, const Names & ordered_names, bool always_reorder) { if (!res) - throw Exception("Empty block passed to fillMissingColumnsImpl", ErrorCodes::LOGICAL_ERROR); + throw Exception("Empty block passed to fillMissingColumns", ErrorCodes::LOGICAL_ERROR); try { /// For a missing column of a nested data structure we must create not a column of empty /// arrays, but a column of arrays of correct length. - /// TODO: If for some nested data structure only missing columns were selected, the arrays in these columns will be empty, - /// even if the offsets for this nested structure are present in the current part. This can be fixed. /// NOTE: Similar, but slightly different code is present in Block::addDefaults. /// First, collect offset columns for all arrays in the block. @@ -462,13 +480,26 @@ void MergeTreeReader::fillMissingColumns(Block & res, const Names & ordered_name } } - auto should_evaluate_defaults = false; - auto should_sort = always_reorder; + bool should_evaluate_defaults = false; + bool should_sort = always_reorder; + size_t rows = res.rows(); + + /// insert default values only for columns without default expressions for (const auto & requested_column : columns) { - /// insert default values only for columns without default expressions - if (!res.has(requested_column.name)) + bool has_column = res.has(requested_column.name); + if (has_column) + { + const auto & col = *res.getByName(requested_column.name).column; + if (arrayHasNoElementsRead(col)) + { + res.erase(requested_column.name); + has_column = false; + } + } + + if (!has_column) { should_sort = true; if (storage.column_defaults.count(requested_column.name) != 0) @@ -499,7 +530,7 @@ void MergeTreeReader::fillMissingColumns(Block & res, const Names & ordered_name /// We must turn a constant column into a full column because the interpreter could infer that it is constant everywhere /// but in some blocks (from other parts) it can be a full column. column_to_add.column = column_to_add.type->createConstColumn( - res.rows(), column_to_add.type->getDefault())->convertToFullColumnIfConst(); + rows, column_to_add.type->getDefault())->convertToFullColumnIfConst(); } res.insert(std::move(column_to_add)); diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index 178713ea4b8..2c030a795ab 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -20,7 +20,6 @@ #include #include -#include #include @@ -272,13 +271,18 @@ void LogBlockInputStream::readData(const String & name, const IDataType & type, String stream_name = IDataType::getFileNameForStream(name, path); + const auto & file_it = storage.files.find(stream_name); + if (storage.files.end() == file_it) + throw Exception("Logical error: no information about file " + stream_name + " in StorageLog", ErrorCodes::LOGICAL_ERROR); + std::cerr << "Stream: " << stream_name << "\n"; - std::cerr << "Offset: " << storage.files[stream_name].marks[mark_number].offset << "\n"; + std::cerr << "Mark number: " << mark_number << "\n"; + std::cerr << "Offset: " << file_it->second.marks[mark_number].offset << "\n"; auto it = streams.try_emplace(stream_name, - storage.files[stream_name].data_file.path(), + file_it->second.data_file.path(), mark_number - ? storage.files[stream_name].marks[mark_number].offset + ? file_it->second.marks[mark_number].offset : 0, max_read_buffer_size).first; @@ -339,13 +343,15 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type type.enumerateStreams([&] (const IDataType::SubstreamPath & path) { String stream_name = IDataType::getFileNameForStream(name, path); + if (written_streams.count(stream_name)) + return; const auto & file = storage.files[stream_name]; - const auto stream_it = streams.find(stream_name); + const auto stream_it = streams.try_emplace(stream_name, storage.files[stream_name].data_file.path(), storage.max_compress_block_size).first; Mark mark; mark.rows = (file.marks.empty() ? 0 : file.marks.back().rows) + column.size(); - mark.offset = stream_it != streams.end() ? stream_it->second.plain_offset + stream_it->second.plain.count() : 0; + mark.offset = stream_it->second.plain_offset + stream_it->second.plain.count(); out_marks.emplace_back(file.column_index, mark); }, {}); @@ -353,12 +359,13 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type IDataType::OutputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> WriteBuffer * { String stream_name = IDataType::getFileNameForStream(name, path); - - auto it_inserted = streams.try_emplace(stream_name, storage.files[stream_name].data_file.path(), storage.max_compress_block_size); - if (!it_inserted.second) + if (written_streams.count(stream_name)) return nullptr; - return &it_inserted.first->second.compressed; + auto it = streams.find(stream_name); + if (streams.end() == it) + throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream", ErrorCodes::LOGICAL_ERROR); + return &it->second.compressed; }; type.serializeBinaryBulkWithMultipleStreams(column, stream_getter, 0, 0, true, {}); @@ -366,6 +373,9 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type type.enumerateStreams([&] (const IDataType::SubstreamPath & path) { String stream_name = IDataType::getFileNameForStream(name, path); + if (!written_streams.emplace(stream_name).second) + return; + auto it = streams.find(stream_name); if (streams.end() == it) throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream", ErrorCodes::LOGICAL_ERROR); @@ -401,7 +411,7 @@ StorageLog::StorageLog( size_t max_compress_block_size_) : IStorage{materialized_columns_, alias_columns_, column_defaults_}, path(path_), name(name_), columns(columns_), - loaded_marks(false), max_compress_block_size(max_compress_block_size_), + max_compress_block_size(max_compress_block_size_), file_checker(path + escapeForFileName(name) + '/' + "sizes.json") { if (columns->empty()) @@ -484,12 +494,6 @@ void StorageLog::loadMarks() } -size_t StorageLog::marksCount() -{ - return files.begin()->second.marks.size(); -} - - void StorageLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) { std::unique_lock lock(rwlock); diff --git a/dbms/src/Storages/StorageLog.h b/dbms/src/Storages/StorageLog.h index 44c6c488679..fad8b2d0309 100644 --- a/dbms/src/Storages/StorageLog.h +++ b/dbms/src/Storages/StorageLog.h @@ -68,7 +68,7 @@ private: */ struct Mark { - size_t rows; /// How many rows are before this offset. + size_t rows; /// How many rows are before this offset including the block at this offset. size_t offset; /// The offset in compressed file. }; @@ -95,7 +95,7 @@ private: /// The order of adding files should not change: it corresponds to the order of the columns in the marks file. void addFiles(const String & column_name, const IDataType & type); - bool loaded_marks; + bool loaded_marks = false; size_t max_compress_block_size; size_t file_count = 0; @@ -107,11 +107,6 @@ private: /// You can not call with a write locked `rwlock`. void loadMarks(); - /// Can be called with any state of `rwlock`. - size_t marksCount(); - - void loadMarksImpl(bool load_null_marks); - /// The order of adding files should not change: it corresponds to the order of the columns in the marks file. void addFile(const String & column_name, const IDataType & type, size_t level = 0); diff --git a/dbms/tests/instructions/heap-profiler.txt b/dbms/tests/instructions/heap-profiler.txt new file mode 100644 index 00000000000..bf9eb7ec3b0 --- /dev/null +++ b/dbms/tests/instructions/heap-profiler.txt @@ -0,0 +1,14 @@ +Build clickhouse without tcmalloc. cmake -D ENABLE_TCMALLOC=0 + +Copy clickhouse binary to your server. +scp dbms/src/Server/clickhouse server:~ + +ssh to your server + +Stop clickhouse: +sudo service clickhouse-server stop + +Run clickhouse with heap profiler from the terminal: +sudo -u clickhouse LD_PRELOAD=/usr/lib/libtcmalloc.so HEAPPROFILE=/var/log/clickhouse-server/heap.hprof ./clickhouse server --config /etc/clickhouse-server/config.xml + +Profiles will appear in /var/log/clickhouse-server/ diff --git a/dbms/tests/queries/0_stateless/00327_summing_composite_nested.sql b/dbms/tests/queries/0_stateless/00327_summing_composite_nested.sql index a9ea7d399a7..43b37616941 100644 --- a/dbms/tests/queries/0_stateless/00327_summing_composite_nested.sql +++ b/dbms/tests/queries/0_stateless/00327_summing_composite_nested.sql @@ -18,11 +18,7 @@ SELECT d, k, m.k1ID, m.k2Key, m.k3Type, m.s FROM test.summing_composite_key ARRA SELECT d, k, m.k1ID, m.k2Key, m.k3Type, sum(m.s) FROM test.summing_composite_key ARRAY JOIN SecondMap AS m GROUP BY d, k, m.k1ID, m.k2Key, m.k3Type ORDER BY d, k, m.k1ID, m.k2Key, m.k3Type; SELECT d, k, m.k1ID, m.k2Key, m.k3Type, m.s FROM test.summing_composite_key FINAL ARRAY JOIN SecondMap AS m ORDER BY d, k, m.k1ID, m.k2Key, m.k3Type, m.s; -OPTIMIZE TABLE test.summing_composite_key; -OPTIMIZE TABLE test.summing_composite_key; -OPTIMIZE TABLE test.summing_composite_key; -OPTIMIZE TABLE test.summing_composite_key; -OPTIMIZE TABLE test.summing_composite_key; +OPTIMIZE TABLE test.summing_composite_key PARTITION 200001 FINAL; SELECT * FROM test.summing_composite_key ORDER BY d, k, _part_index; diff --git a/debian/changelog b/debian/changelog index c3ad4753187..303ed070568 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,5 +1,5 @@ -clickhouse (1.1.54312) unstable; urgency=low +clickhouse (1.1.54314) unstable; urgency=low * Modified source code - -- Wed, 15 Nov 2017 16:12:02 +0300 + -- Tue, 28 Nov 2017 13:25:47 +0300 diff --git a/libs/libcommon/CMakeLists.txt b/libs/libcommon/CMakeLists.txt index 742b2633f40..843226ec342 100644 --- a/libs/libcommon/CMakeLists.txt +++ b/libs/libcommon/CMakeLists.txt @@ -66,7 +66,7 @@ if (USE_JEMALLOC) message (STATUS "Link jemalloc : ${JEMALLOC_LIBRARIES}") set (MALLOC_LIBRARIES ${JEMALLOC_LIBRARIES}) elseif (USE_TCMALLOC) - if (DEBUG_LIBTCMALLOC) + if (DEBUG_TCMALLOC) message (STATUS "Link libtcmalloc_minimal_debug for testing: ${GPERFTOOLS_TCMALLOC_MINIMAL_DEBUG}") set (MALLOC_LIBRARIES ${GPERFTOOLS_TCMALLOC_MINIMAL_DEBUG}) else () diff --git a/libs/libcommon/cmake/find_gperftools.cmake b/libs/libcommon/cmake/find_gperftools.cmake index 31c9373586c..479c9b80b01 100644 --- a/libs/libcommon/cmake/find_gperftools.cmake +++ b/libs/libcommon/cmake/find_gperftools.cmake @@ -5,16 +5,16 @@ else () endif () if (CMAKE_SYSTEM MATCHES "FreeBSD") - option (ENABLE_LIBTCMALLOC "Set to TRUE to enable libtcmalloc" OFF) + option (ENABLE_TCMALLOC "Set to TRUE to enable tcmalloc" OFF) else () - option (ENABLE_LIBTCMALLOC "Set to TRUE to enable libtcmalloc" ON) + option (ENABLE_TCMALLOC "Set to TRUE to enable tcmalloc" ON) endif () -option (DEBUG_LIBTCMALLOC "Set to TRUE to use debug version of libtcmalloc" OFF) +option (DEBUG_TCMALLOC "Set to TRUE to use debug version of libtcmalloc" OFF) -if (ENABLE_LIBTCMALLOC) +if (ENABLE_TCMALLOC) #contrib/libtcmalloc doesnt build debug version, try find in system - if (DEBUG_LIBTCMALLOC OR NOT USE_INTERNAL_GPERFTOOLS_LIBRARY) + if (DEBUG_TCMALLOC OR NOT USE_INTERNAL_GPERFTOOLS_LIBRARY) find_package (Gperftools) endif () diff --git a/utils/compressor/CMakeLists.txt b/utils/compressor/CMakeLists.txt index 06b6a16ea1e..62e11a75891 100644 --- a/utils/compressor/CMakeLists.txt +++ b/utils/compressor/CMakeLists.txt @@ -1,3 +1,4 @@ +find_package (Threads) add_executable (clickhouse-compressor main.cpp) target_link_libraries (clickhouse-compressor clickhouse-compressor-lib) @@ -5,4 +6,4 @@ target_link_libraries (clickhouse-compressor clickhouse-compressor-lib) install (TARGETS clickhouse-compressor RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse-compressor) add_executable (zstd_test zstd_test.cpp) -target_link_libraries (zstd_test ${ZSTD_LIBRARY}) +target_link_libraries (zstd_test ${ZSTD_LIBRARY} Threads::Threads)