Merge branch 'unify-data-types-that-serialized-with-multiple-streams' of github.com:yandex/ClickHouse into unify-data-types-that-serialized-with-multiple-streams

This commit is contained in:
Alexey Milovidov 2017-12-01 00:42:09 +03:00
commit 304acb9a8e
27 changed files with 392 additions and 196 deletions

View File

@ -151,6 +151,8 @@ if (MAKE_STATIC_LIBRARIES AND NOT APPLE AND NOT (CMAKE_CXX_COMPILER_ID STREQUAL
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libgcc -static-libstdc++")
endif ()
set(THREADS_PREFER_PTHREAD_FLAG ON)
include (cmake/test_compiler.cmake)
if (CMAKE_SYSTEM MATCHES "Linux" AND CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
@ -158,7 +160,7 @@ if (CMAKE_SYSTEM MATCHES "Linux" AND CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
option (USE_LIBCXX "Use libc++ and libc++abi instead of libstdc++ (only make sense on Linux with Clang)" ${HAVE_LIBCXX})
if (USE_LIBCXX)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++") # Ok for clang6, for older can cause 'not used option' worning
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++") # Ok for clang6, for older can cause 'not used option' warning
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -D_LIBCPP_DEBUG=1") # More checks in debug build.
if (MAKE_STATIC_LIBRARIES)
link_libraries (-Wl,-Bstatic -stdlib=libc++ c++ c++abi -Wl,-Bdynamic)

View File

@ -64,7 +64,7 @@ if (USE_INTERNAL_CCTZ_LIBRARY)
add_subdirectory (cctz-cmake)
endif ()
if (ENABLE_LIBTCMALLOC AND USE_INTERNAL_GPERFTOOLS_LIBRARY)
if (ENABLE_TCMALLOC AND USE_INTERNAL_GPERFTOOLS_LIBRARY)
add_subdirectory (libtcmalloc)
endif ()

View File

@ -1,6 +1,6 @@
# This strings autochanged from release_lib.sh:
set(VERSION_DESCRIBE v1.1.54312-testing)
set(VERSION_REVISION 54312)
set(VERSION_DESCRIBE v1.1.54314-testing)
set(VERSION_REVISION 54314)
# end of autochange
set (VERSION_MAJOR 1)

View File

@ -12,6 +12,7 @@ add_executable (move_field move_field.cpp)
target_link_libraries (move_field clickhouse_common_io)
add_executable (rvo_test rvo_test.cpp)
target_link_libraries (rvo_test Threads::Threads)
add_executable (string_ref_hash string_ref_hash.cpp)
target_link_libraries (string_ref_hash clickhouse_common_io)

View File

@ -62,12 +62,10 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(ColumnPlainPtrs & m
}
catch (...)
{
desc.function->destroy(desc.state.data());
desc.created = false;
desc.destroyState();
throw;
}
desc.function->destroy(desc.state.data());
desc.created = false;
desc.destroyState();
}
else
desc.merged_column->insertDefault();
@ -123,8 +121,6 @@ Block SummingSortedBlockInputStream::readImpl()
/// Additional initialization.
if (current_row.empty())
{
auto & factory = AggregateFunctionFactory::instance();
current_row.resize(num_columns);
next_key.columns.resize(description.size());
@ -178,12 +174,9 @@ Block SummingSortedBlockInputStream::readImpl()
std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name))
{
// Create aggregator to sum this column
auto desc = AggregateDescription{};
AggregateDescription desc;
desc.column_numbers = {i};
desc.function = factory.get("sumWithOverflow", {column.type});
desc.function->setArguments({column.type});
desc.add_function = desc.function->getAddressOfAddFunction();
desc.state.resize(desc.function->sizeOfData());
desc.init("sumWithOverflow", {column.type});
columns_to_aggregate.emplace_back(std::move(desc));
}
else
@ -218,8 +211,8 @@ Block SummingSortedBlockInputStream::readImpl()
}
DataTypes argument_types = {};
auto desc = AggregateDescription{};
auto map_desc = MapDescription{};
AggregateDescription desc;
MapDescription map_desc;
column_num_it = map.second.begin();
for (; column_num_it != map.second.end(); ++column_num_it)
@ -263,10 +256,7 @@ Block SummingSortedBlockInputStream::readImpl()
if (map_desc.key_col_nums.size() == 1)
{
// Create summation for all value columns in the map
desc.function = factory.get("sumMap", argument_types);
desc.function->setArguments(argument_types);
desc.add_function = desc.function->getAddressOfAddFunction();
desc.state.resize(desc.function->sizeOfData());
desc.init("sumMap", argument_types);
columns_to_aggregate.emplace_back(std::move(desc));
}
else
@ -345,10 +335,7 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
/// Reset aggregation states for next row
for (auto & desc : columns_to_aggregate)
{
desc.function->create(desc.state.data());
desc.created = true;
}
desc.createState();
// Start aggregations with current row
addRow(current_row, current);

View File

@ -4,6 +4,8 @@
#include <Core/ColumnNumbers.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
namespace DB
{
@ -79,12 +81,39 @@ private:
std::vector<char> state;
bool created = false;
void init(const char * function_name, const DataTypes & argument_types)
{
function = AggregateFunctionFactory::instance().get(function_name, argument_types);
function->setArguments(argument_types);
add_function = function->getAddressOfAddFunction();
state.resize(function->sizeOfData());
}
void createState()
{
if (created)
return;
function->create(state.data());
created = true;
}
void destroyState()
{
if (!created)
return;
function->destroy(state.data());
created = false;
}
/// Explicitly destroy aggregation state if the stream is terminated
~AggregateDescription()
{
if (created)
function->destroy(state.data());
destroyState();
}
AggregateDescription() = default;
AggregateDescription(AggregateDescription &&) = default;
AggregateDescription(const AggregateDescription &) = delete;
};
/// Stores numbers of key-columns and value-columns.

View File

@ -243,9 +243,7 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams(
/// Check consistency between offsets and elements subcolumns.
/// But if elements column is empty - it's ok for columns of Nested types that was added by ALTER.
if (nested_column.empty())
column_array.getOffsetsColumn() = column_array.getOffsetsColumn()->cloneEmpty();
else if (nested_column.size() != last_offset)
if (!nested_column.empty() && nested_column.size() != last_offset)
throw Exception("Cannot read all array values", ErrorCodes::CANNOT_READ_ALL_DATA);
}

View File

@ -68,7 +68,7 @@ private:
using StringRefsForLanguageID = std::vector<StringRefs>;
public:
/** Reboot, if necessary, the names of regions.
/** Reload the names of regions if necessary.
*/
void reload(const Poco::Util::AbstractConfiguration & config);
void reload(const std::string & directory);

View File

@ -25,7 +25,8 @@
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/CompressionSettingsSelector.h>
#include <Interpreters/Settings.h>
#include <Interpreters/Users.h>
#include <Interpreters/RuntimeComponentsFactory.h>
#include <Interpreters/ISecurityManager.h>
#include <Interpreters/Quota.h>
#include <Interpreters/EmbeddedDictionaries.h>
#include <Interpreters/ExternalDictionaries.h>
@ -90,6 +91,8 @@ struct ContextShared
{
Logger * log = &Logger::get("Context");
std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory;
/// For access of most of shared objects. Recursive mutex.
mutable Poco::Mutex mutex;
/// Separate mutex for access of dictionaries. Separate mutex to avoid locks when server doing request to itself.
@ -115,7 +118,7 @@ struct ContextShared
mutable std::shared_ptr<ExternalDictionaries> external_dictionaries;
mutable std::shared_ptr<ExternalModels> external_models;
String default_profile_name; /// Default profile name used for default values.
Users users; /// Known users.
std::shared_ptr<ISecurityManager> security_manager; /// Known users.
Quotas quotas; /// Known quotas for resource use.
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
@ -181,7 +184,8 @@ struct ContextShared
pcg64 rng{randomSeed()};
ContextShared()
ContextShared(std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory_)
: runtime_components_factory(std::move(runtime_components_factory_))
{
/// TODO: make it singleton (?)
static std::atomic<size_t> num_calls{0};
@ -191,6 +195,8 @@ struct ContextShared
std::cerr.flush();
std::terminate();
}
initialize();
}
@ -236,21 +242,32 @@ struct ContextShared
databases.clear();
}
}
private:
void initialize()
{
security_manager = runtime_components_factory->createSecurityManager();
}
};
Context::Context() = default;
Context Context::createGlobal()
Context Context::createGlobal(std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory)
{
Context res;
res.shared = std::make_shared<ContextShared>();
res.runtime_components_factory = runtime_components_factory;
res.shared = std::make_shared<ContextShared>(runtime_components_factory);
res.quota = std::make_shared<QuotaForIntervals>();
res.system_logs = std::make_shared<SystemLogs>();
return res;
}
Context Context::createGlobal()
{
return createGlobal(std::make_unique<RuntimeComponentsFactory>());
}
Context::~Context()
{
@ -512,7 +529,7 @@ void Context::setUsersConfig(const ConfigurationPtr & config)
{
auto lock = getLock();
shared->users_config = config;
shared->users.loadFromConfig(*shared->users_config);
shared->security_manager->loadFromConfig(*shared->users_config);
shared->quotas.loadFromConfig(*shared->users_config);
}
@ -526,7 +543,7 @@ void Context::calculateUserSettings()
{
auto lock = getLock();
String profile = shared->users.get(client_info.current_user).profile;
String profile = shared->security_manager->getUser(client_info.current_user).profile;
/// 1) Set default settings (hardcoded values)
/// NOTE: we ignore global_context settings (from which it is usually copied)
@ -547,7 +564,7 @@ void Context::setUser(const String & name, const String & password, const Poco::
{
auto lock = getLock();
const User & user_props = shared->users.get(name, password, address.host());
const User & user_props = shared->security_manager->authorizeAndGetUser(name, password, address.host());
client_info.current_user = name;
client_info.current_address = address;
@ -582,7 +599,7 @@ void Context::checkDatabaseAccessRights(const std::string & database_name) const
/// All users have access to the database system.
return;
}
if (!shared->users.isAllowedDatabase(client_info.current_user, database_name))
if (!shared->security_manager->hasAccessToDatabase(client_info.current_user, database_name))
throw Exception("Access denied to database " + database_name, ErrorCodes::DATABASE_ACCESS_DENIED);
}

View File

@ -32,6 +32,7 @@ namespace DB
{
struct ContextShared;
class IRuntimeComponentsFactory;
class QuotaForIntervals;
class EmbeddedDictionaries;
class ExternalDictionaries;
@ -89,6 +90,8 @@ private:
using Shared = std::shared_ptr<ContextShared>;
Shared shared;
std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory;
ClientInfo client_info;
std::shared_ptr<QuotaForIntervals> quota; /// Current quota. By default - empty quota, that have no limits.
@ -116,6 +119,7 @@ private:
public:
/// Create initial Context with ContextShared and etc.
static Context createGlobal(std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory);
static Context createGlobal();
~Context();

View File

@ -0,0 +1,22 @@
#pragma once
#include <Interpreters/ISecurityManager.h>
#include <memory>
namespace DB
{
/** Factory of query engine runtime components / services.
* Helps to host query engine in external applications
* by replacing or reconfiguring its components.
*/
class IRuntimeComponentsFactory
{
public:
virtual std::unique_ptr<ISecurityManager> createSecurityManager() = 0;
virtual ~IRuntimeComponentsFactory() {}
};
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <Interpreters/Users.h>
namespace DB
{
/** Duties of security manager:
* 1) Authenticate users
* 2) Provide user settings (profile, quota, ACLs)
* 3) Grant access to databases
*/
class ISecurityManager
{
public:
virtual void loadFromConfig(Poco::Util::AbstractConfiguration & config) = 0;
/// Find user and make authorize checks
virtual const User & authorizeAndGetUser(
const String & user_name,
const String & password,
const Poco::Net::IPAddress & address) const = 0;
/// Just find user
virtual const User & getUser(const String & user_name) const = 0;
/// Check if the user has access to the database.
virtual bool hasAccessToDatabase(const String & user_name, const String & database_name) const = 0;
virtual ~ISecurityManager() {}
};
}

View File

@ -0,0 +1,21 @@
#pragma once
#include <Interpreters/IRuntimeComponentsFactory.h>
#include <Interpreters/SecurityManager.h>
namespace DB
{
/** Default implementation of runtime components factory
* used by native server application.
*/
class RuntimeComponentsFactory : public IRuntimeComponentsFactory
{
public:
std::unique_ptr<ISecurityManager> createSecurityManager() override
{
return std::make_unique<SecurityManager>();
}
};
}

View File

@ -0,0 +1,114 @@
#include <Interpreters/SecurityManager.h>
#include <Poco/Net/IPAddress.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/String.h>
#include <Common/Exception.h>
#include <IO/HexWriteBuffer.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <openssl/sha.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int DNS_ERROR;
extern const int UNKNOWN_ADDRESS_PATTERN_TYPE;
extern const int UNKNOWN_USER;
extern const int REQUIRED_PASSWORD;
extern const int WRONG_PASSWORD;
extern const int IP_ADDRESS_NOT_ALLOWED;
extern const int BAD_ARGUMENTS;
}
void SecurityManager::loadFromConfig(Poco::Util::AbstractConfiguration & config)
{
Container new_users;
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys("users", config_keys);
for (const std::string & key : config_keys)
new_users.emplace(std::piecewise_construct, std::forward_as_tuple(key), std::forward_as_tuple(key, "users." + key, config));
users = std::move(new_users);
}
const User & SecurityManager::authorizeAndGetUser(
const String & user_name,
const String & password,
const Poco::Net::IPAddress & address) const
{
auto it = users.find(user_name);
if (users.end() == it)
throw Exception("Unknown user " + user_name, ErrorCodes::UNKNOWN_USER);
if (!it->second.addresses.contains(address))
throw Exception("User " + user_name + " is not allowed to connect from address " + address.toString(), ErrorCodes::IP_ADDRESS_NOT_ALLOWED);
auto on_wrong_password = [&]()
{
if (password.empty())
throw Exception("Password required for user " + user_name, ErrorCodes::REQUIRED_PASSWORD);
else
throw Exception("Wrong password for user " + user_name, ErrorCodes::WRONG_PASSWORD);
};
if (!it->second.password_sha256_hex.empty())
{
unsigned char hash[32];
SHA256_CTX ctx;
SHA256_Init(&ctx);
SHA256_Update(&ctx, reinterpret_cast<const unsigned char *>(password.data()), password.size());
SHA256_Final(hash, &ctx);
String hash_hex;
{
WriteBufferFromString buf(hash_hex);
HexWriteBuffer hex_buf(buf);
hex_buf.write(reinterpret_cast<const char *>(hash), sizeof(hash));
}
Poco::toLowerInPlace(hash_hex);
if (hash_hex != it->second.password_sha256_hex)
on_wrong_password();
}
else if (password != it->second.password)
{
on_wrong_password();
}
return it->second;
}
const User & SecurityManager::getUser(const String & user_name) const
{
auto it = users.find(user_name);
if (users.end() == it)
throw Exception("Unknown user " + user_name, ErrorCodes::UNKNOWN_USER);
return it->second;
}
bool SecurityManager::hasAccessToDatabase(const std::string & user_name, const std::string & database_name) const
{
auto it = users.find(user_name);
if (users.end() == it)
throw Exception("Unknown user " + user_name, ErrorCodes::UNKNOWN_USER);
const auto & user = it->second;
return user.databases.empty() || user.databases.count(database_name);
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Interpreters/ISecurityManager.h>
#include <map>
namespace DB
{
/** Default implementation of security manager used by native server application.
* Manages fixed set of users listed in 'Users' configuration file.
*/
class SecurityManager : public ISecurityManager
{
private:
using Container = std::map<String, User>;
Container users;
public:
void loadFromConfig(Poco::Util::AbstractConfiguration & config) override;
const User & authorizeAndGetUser(
const String & user_name,
const String & password,
const Poco::Net::IPAddress & address) const override;
const User & getUser(const String & user_name) const override;
bool hasAccessToDatabase(const String & user_name, const String & database_name) const override;
};
}

View File

@ -306,87 +306,4 @@ User::User(const String & name_, const String & config_elem, Poco::Util::Abstrac
}
void Users::loadFromConfig(Poco::Util::AbstractConfiguration & config)
{
Container new_cont;
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys("users", config_keys);
for (const std::string & key : config_keys)
new_cont.emplace(std::piecewise_construct, std::forward_as_tuple(key), std::forward_as_tuple(key, "users." + key, config));
cont = std::move(new_cont);
}
const User & Users::get(const String & user_name, const String & password, const Poco::Net::IPAddress & address) const
{
auto it = cont.find(user_name);
if (cont.end() == it)
throw Exception("Unknown user " + user_name, ErrorCodes::UNKNOWN_USER);
if (!it->second.addresses.contains(address))
throw Exception("User " + user_name + " is not allowed to connect from address " + address.toString(), ErrorCodes::IP_ADDRESS_NOT_ALLOWED);
auto on_wrong_password = [&]()
{
if (password.empty())
throw Exception("Password required for user " + user_name, ErrorCodes::REQUIRED_PASSWORD);
else
throw Exception("Wrong password for user " + user_name, ErrorCodes::WRONG_PASSWORD);
};
if (!it->second.password_sha256_hex.empty())
{
unsigned char hash[32];
SHA256_CTX ctx;
SHA256_Init(&ctx);
SHA256_Update(&ctx, reinterpret_cast<const unsigned char *>(password.data()), password.size());
SHA256_Final(hash, &ctx);
String hash_hex;
{
WriteBufferFromString buf(hash_hex);
HexWriteBuffer hex_buf(buf);
hex_buf.write(reinterpret_cast<const char *>(hash), sizeof(hash));
}
Poco::toLowerInPlace(hash_hex);
if (hash_hex != it->second.password_sha256_hex)
on_wrong_password();
}
else if (password != it->second.password)
{
on_wrong_password();
}
return it->second;
}
const User & Users::get(const String & user_name)
{
auto it = cont.find(user_name);
if (cont.end() == it)
throw Exception("Unknown user " + user_name, ErrorCodes::UNKNOWN_USER);
return it->second;
}
bool Users::isAllowedDatabase(const std::string & user_name, const std::string & database_name) const
{
auto it = cont.find(user_name);
if (it == cont.end())
throw Exception("Unknown user " + user_name, ErrorCodes::UNKNOWN_USER);
const auto & user = it->second;
return user.databases.empty() || user.databases.count(database_name);
}
}

View File

@ -2,7 +2,6 @@
#include <Core/Types.h>
#include <map>
#include <vector>
#include <unordered_set>
#include <memory>
@ -38,7 +37,7 @@ public:
class AddressPatterns
{
private:
using Container = std::vector<std::unique_ptr<IAddressPattern>>;
using Container = std::vector<std::shared_ptr<IAddressPattern>>;
Container patterns;
public:
@ -70,25 +69,4 @@ struct User
};
/// Known users.
class Users
{
private:
using Container = std::map<String, User>;
Container cont;
public:
void loadFromConfig(Poco::Util::AbstractConfiguration & config);
/// Find user and make authorize checks
const User & get(const String & user_name, const String & password, const Poco::Net::IPAddress & address) const;
/// Just find user
const User & get(const String & user_name);
/// Check if the user has access to the database.
bool isAllowedDatabase(const String & user_name, const String & database_name) const;
};
}

View File

@ -1,5 +1,5 @@
#include <Common/ConfigProcessor.h>
#include <Interpreters/Users.h>
#include <Interpreters/SecurityManager.h>
#include <boost/filesystem.hpp>
@ -204,11 +204,11 @@ void runOneTest(size_t test_num, const TestDescriptor & test_descriptor)
throw std::runtime_error(os.str());
}
DB::Users users;
DB::SecurityManager security_manager;
try
{
users.loadFromConfig(*config);
security_manager.loadFromConfig(*config);
}
catch (const Poco::Exception & ex)
{
@ -223,7 +223,7 @@ void runOneTest(size_t test_num, const TestDescriptor & test_descriptor)
try
{
res = users.isAllowedDatabase(entry.user_name, entry.database_name);
res = security_manager.hasAccessToDatabase(entry.user_name, entry.database_name);
}
catch (const Poco::Exception &)
{

View File

@ -431,17 +431,35 @@ void MergeTreeReader::readData(
}
static bool arrayHasNoElementsRead(const IColumn & column)
{
const ColumnArray * column_array = typeid_cast<const ColumnArray *>(&column);
if (!column_array)
return false;
size_t size = column_array->size();
if (!size)
return false;
size_t data_size = column_array->getData().size();
if (data_size)
return false;
size_t last_offset = column_array->getOffsets()[size - 1];
return last_offset != 0;
}
void MergeTreeReader::fillMissingColumns(Block & res, const Names & ordered_names, bool always_reorder)
{
if (!res)
throw Exception("Empty block passed to fillMissingColumnsImpl", ErrorCodes::LOGICAL_ERROR);
throw Exception("Empty block passed to fillMissingColumns", ErrorCodes::LOGICAL_ERROR);
try
{
/// For a missing column of a nested data structure we must create not a column of empty
/// arrays, but a column of arrays of correct length.
/// TODO: If for some nested data structure only missing columns were selected, the arrays in these columns will be empty,
/// even if the offsets for this nested structure are present in the current part. This can be fixed.
/// NOTE: Similar, but slightly different code is present in Block::addDefaults.
/// First, collect offset columns for all arrays in the block.
@ -462,13 +480,26 @@ void MergeTreeReader::fillMissingColumns(Block & res, const Names & ordered_name
}
}
auto should_evaluate_defaults = false;
auto should_sort = always_reorder;
bool should_evaluate_defaults = false;
bool should_sort = always_reorder;
size_t rows = res.rows();
/// insert default values only for columns without default expressions
for (const auto & requested_column : columns)
{
/// insert default values only for columns without default expressions
if (!res.has(requested_column.name))
bool has_column = res.has(requested_column.name);
if (has_column)
{
const auto & col = *res.getByName(requested_column.name).column;
if (arrayHasNoElementsRead(col))
{
res.erase(requested_column.name);
has_column = false;
}
}
if (!has_column)
{
should_sort = true;
if (storage.column_defaults.count(requested_column.name) != 0)
@ -499,7 +530,7 @@ void MergeTreeReader::fillMissingColumns(Block & res, const Names & ordered_name
/// We must turn a constant column into a full column because the interpreter could infer that it is constant everywhere
/// but in some blocks (from other parts) it can be a full column.
column_to_add.column = column_to_add.type->createConstColumn(
res.rows(), column_to_add.type->getDefault())->convertToFullColumnIfConst();
rows, column_to_add.type->getDefault())->convertToFullColumnIfConst();
}
res.insert(std::move(column_to_add));

View File

@ -20,7 +20,6 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <Common/typeid_cast.h>
@ -272,13 +271,18 @@ void LogBlockInputStream::readData(const String & name, const IDataType & type,
String stream_name = IDataType::getFileNameForStream(name, path);
const auto & file_it = storage.files.find(stream_name);
if (storage.files.end() == file_it)
throw Exception("Logical error: no information about file " + stream_name + " in StorageLog", ErrorCodes::LOGICAL_ERROR);
std::cerr << "Stream: " << stream_name << "\n";
std::cerr << "Offset: " << storage.files[stream_name].marks[mark_number].offset << "\n";
std::cerr << "Mark number: " << mark_number << "\n";
std::cerr << "Offset: " << file_it->second.marks[mark_number].offset << "\n";
auto it = streams.try_emplace(stream_name,
storage.files[stream_name].data_file.path(),
file_it->second.data_file.path(),
mark_number
? storage.files[stream_name].marks[mark_number].offset
? file_it->second.marks[mark_number].offset
: 0,
max_read_buffer_size).first;
@ -339,13 +343,15 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
{
String stream_name = IDataType::getFileNameForStream(name, path);
if (written_streams.count(stream_name))
return;
const auto & file = storage.files[stream_name];
const auto stream_it = streams.find(stream_name);
const auto stream_it = streams.try_emplace(stream_name, storage.files[stream_name].data_file.path(), storage.max_compress_block_size).first;
Mark mark;
mark.rows = (file.marks.empty() ? 0 : file.marks.back().rows) + column.size();
mark.offset = stream_it != streams.end() ? stream_it->second.plain_offset + stream_it->second.plain.count() : 0;
mark.offset = stream_it->second.plain_offset + stream_it->second.plain.count();
out_marks.emplace_back(file.column_index, mark);
}, {});
@ -353,12 +359,13 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
IDataType::OutputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> WriteBuffer *
{
String stream_name = IDataType::getFileNameForStream(name, path);
auto it_inserted = streams.try_emplace(stream_name, storage.files[stream_name].data_file.path(), storage.max_compress_block_size);
if (!it_inserted.second)
if (written_streams.count(stream_name))
return nullptr;
return &it_inserted.first->second.compressed;
auto it = streams.find(stream_name);
if (streams.end() == it)
throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream", ErrorCodes::LOGICAL_ERROR);
return &it->second.compressed;
};
type.serializeBinaryBulkWithMultipleStreams(column, stream_getter, 0, 0, true, {});
@ -366,6 +373,9 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
{
String stream_name = IDataType::getFileNameForStream(name, path);
if (!written_streams.emplace(stream_name).second)
return;
auto it = streams.find(stream_name);
if (streams.end() == it)
throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream", ErrorCodes::LOGICAL_ERROR);
@ -401,7 +411,7 @@ StorageLog::StorageLog(
size_t max_compress_block_size_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_), name(name_), columns(columns_),
loaded_marks(false), max_compress_block_size(max_compress_block_size_),
max_compress_block_size(max_compress_block_size_),
file_checker(path + escapeForFileName(name) + '/' + "sizes.json")
{
if (columns->empty())
@ -484,12 +494,6 @@ void StorageLog::loadMarks()
}
size_t StorageLog::marksCount()
{
return files.begin()->second.marks.size();
}
void StorageLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
std::unique_lock<std::shared_mutex> lock(rwlock);

View File

@ -68,7 +68,7 @@ private:
*/
struct Mark
{
size_t rows; /// How many rows are before this offset.
size_t rows; /// How many rows are before this offset including the block at this offset.
size_t offset; /// The offset in compressed file.
};
@ -95,7 +95,7 @@ private:
/// The order of adding files should not change: it corresponds to the order of the columns in the marks file.
void addFiles(const String & column_name, const IDataType & type);
bool loaded_marks;
bool loaded_marks = false;
size_t max_compress_block_size;
size_t file_count = 0;
@ -107,11 +107,6 @@ private:
/// You can not call with a write locked `rwlock`.
void loadMarks();
/// Can be called with any state of `rwlock`.
size_t marksCount();
void loadMarksImpl(bool load_null_marks);
/// The order of adding files should not change: it corresponds to the order of the columns in the marks file.
void addFile(const String & column_name, const IDataType & type, size_t level = 0);

View File

@ -0,0 +1,14 @@
Build clickhouse without tcmalloc. cmake -D ENABLE_TCMALLOC=0
Copy clickhouse binary to your server.
scp dbms/src/Server/clickhouse server:~
ssh to your server
Stop clickhouse:
sudo service clickhouse-server stop
Run clickhouse with heap profiler from the terminal:
sudo -u clickhouse LD_PRELOAD=/usr/lib/libtcmalloc.so HEAPPROFILE=/var/log/clickhouse-server/heap.hprof ./clickhouse server --config /etc/clickhouse-server/config.xml
Profiles will appear in /var/log/clickhouse-server/

View File

@ -18,11 +18,7 @@ SELECT d, k, m.k1ID, m.k2Key, m.k3Type, m.s FROM test.summing_composite_key ARRA
SELECT d, k, m.k1ID, m.k2Key, m.k3Type, sum(m.s) FROM test.summing_composite_key ARRAY JOIN SecondMap AS m GROUP BY d, k, m.k1ID, m.k2Key, m.k3Type ORDER BY d, k, m.k1ID, m.k2Key, m.k3Type;
SELECT d, k, m.k1ID, m.k2Key, m.k3Type, m.s FROM test.summing_composite_key FINAL ARRAY JOIN SecondMap AS m ORDER BY d, k, m.k1ID, m.k2Key, m.k3Type, m.s;
OPTIMIZE TABLE test.summing_composite_key;
OPTIMIZE TABLE test.summing_composite_key;
OPTIMIZE TABLE test.summing_composite_key;
OPTIMIZE TABLE test.summing_composite_key;
OPTIMIZE TABLE test.summing_composite_key;
OPTIMIZE TABLE test.summing_composite_key PARTITION 200001 FINAL;
SELECT * FROM test.summing_composite_key ORDER BY d, k, _part_index;

4
debian/changelog vendored
View File

@ -1,5 +1,5 @@
clickhouse (1.1.54312) unstable; urgency=low
clickhouse (1.1.54314) unstable; urgency=low
* Modified source code
-- <robot-metrika-test@yandex-team.ru> Wed, 15 Nov 2017 16:12:02 +0300
-- <robot-metrika-test@yandex-team.ru> Tue, 28 Nov 2017 13:25:47 +0300

View File

@ -66,7 +66,7 @@ if (USE_JEMALLOC)
message (STATUS "Link jemalloc : ${JEMALLOC_LIBRARIES}")
set (MALLOC_LIBRARIES ${JEMALLOC_LIBRARIES})
elseif (USE_TCMALLOC)
if (DEBUG_LIBTCMALLOC)
if (DEBUG_TCMALLOC)
message (STATUS "Link libtcmalloc_minimal_debug for testing: ${GPERFTOOLS_TCMALLOC_MINIMAL_DEBUG}")
set (MALLOC_LIBRARIES ${GPERFTOOLS_TCMALLOC_MINIMAL_DEBUG})
else ()

View File

@ -5,16 +5,16 @@ else ()
endif ()
if (CMAKE_SYSTEM MATCHES "FreeBSD")
option (ENABLE_LIBTCMALLOC "Set to TRUE to enable libtcmalloc" OFF)
option (ENABLE_TCMALLOC "Set to TRUE to enable tcmalloc" OFF)
else ()
option (ENABLE_LIBTCMALLOC "Set to TRUE to enable libtcmalloc" ON)
option (ENABLE_TCMALLOC "Set to TRUE to enable tcmalloc" ON)
endif ()
option (DEBUG_LIBTCMALLOC "Set to TRUE to use debug version of libtcmalloc" OFF)
option (DEBUG_TCMALLOC "Set to TRUE to use debug version of libtcmalloc" OFF)
if (ENABLE_LIBTCMALLOC)
if (ENABLE_TCMALLOC)
#contrib/libtcmalloc doesnt build debug version, try find in system
if (DEBUG_LIBTCMALLOC OR NOT USE_INTERNAL_GPERFTOOLS_LIBRARY)
if (DEBUG_TCMALLOC OR NOT USE_INTERNAL_GPERFTOOLS_LIBRARY)
find_package (Gperftools)
endif ()

View File

@ -1,3 +1,4 @@
find_package (Threads)
add_executable (clickhouse-compressor main.cpp)
target_link_libraries (clickhouse-compressor clickhouse-compressor-lib)
@ -5,4 +6,4 @@ target_link_libraries (clickhouse-compressor clickhouse-compressor-lib)
install (TARGETS clickhouse-compressor RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse-compressor)
add_executable (zstd_test zstd_test.cpp)
target_link_libraries (zstd_test ${ZSTD_LIBRARY})
target_link_libraries (zstd_test ${ZSTD_LIBRARY} Threads::Threads)