Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2018-12-14 20:50:23 +03:00
commit c39a19c126
40 changed files with 259 additions and 279 deletions

View File

@ -1,11 +1,11 @@
# This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54411 CACHE STRING "") # changed manually for tests
set(VERSION_REVISION 54412 CACHE STRING "") # changed manually for tests
set(VERSION_MAJOR 18 CACHE STRING "")
set(VERSION_MINOR 15 CACHE STRING "")
set(VERSION_MINOR 16 CACHE STRING "")
set(VERSION_PATCH 0 CACHE STRING "")
set(VERSION_GITHASH add14ee00bf5695ffc01abc7f4c4a0ef1dbc1ba2 CACHE STRING "")
set(VERSION_DESCRIBE v18.15.0-testing CACHE STRING "")
set(VERSION_STRING 18.15.0 CACHE STRING "")
set(VERSION_GITHASH b9b48c646c253358340bd39fd57754e92f88cd8a CACHE STRING "")
set(VERSION_DESCRIBE v18.16.0-testing CACHE STRING "")
set(VERSION_STRING 18.16.0 CACHE STRING "")
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -1,6 +1,11 @@
#include <Columns/ColumnUnique.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
@ -101,3 +106,88 @@ TEST(column_unique, column_unique_unique_insert_range_with_overflow_Test)
ASSERT_EQ(std::to_string(max_val + i), add_keys->getDataAt(i).toString());
}
}
template <typename ColumnType>
void column_unique_unique_deserialize_from_arena_impl(ColumnType & column, const IDataType & data_type)
{
size_t num_values = column.size();
{
/// Check serialization is reversible.
Arena arena;
auto column_unique_pattern = ColumnUnique<ColumnString>::create(data_type);
auto column_unique = ColumnUnique<ColumnString>::create(data_type);
auto idx = column_unique_pattern->uniqueInsertRangeFrom(column, 0, num_values);
const char * pos = nullptr;
for (size_t i = 0; i < num_values; ++i)
{
auto ref = column_unique_pattern->serializeValueIntoArena(idx->getUInt(i), arena, pos);
const char * new_pos;
column_unique->uniqueDeserializeAndInsertFromArena(ref.data, new_pos);
ASSERT_EQ(new_pos - ref.data, ref.size) << "Deserialized data has different sizes at position " << i;
ASSERT_EQ(column_unique_pattern->getNestedNotNullableColumn()->getDataAt(idx->getUInt(i)),
column_unique->getNestedNotNullableColumn()->getDataAt(idx->getUInt(i)))
<< "Deserialized data is different from pattern at position " << i;
}
}
{
/// Check serialization the same with ordinary column.
Arena arena_string;
Arena arena_lc;
auto column_unique = ColumnUnique<ColumnString>::create(data_type);
auto idx = column_unique->uniqueInsertRangeFrom(column, 0, num_values);
const char * pos_string = nullptr;
const char * pos_lc = nullptr;
for (size_t i = 0; i < num_values; ++i)
{
auto ref_string = column.serializeValueIntoArena(i, arena_string, pos_string);
auto ref_lc = column_unique->serializeValueIntoArena(idx->getUInt(i), arena_lc, pos_lc);
ASSERT_EQ(ref_string, ref_lc) << "Serialized data is different from pattern at position " << i;
}
}
}
TEST(column_unique, column_unique_unique_deserialize_from_arena_String_Test)
{
auto data_type = std::make_shared<DataTypeString>();
auto column_string = ColumnString::create();
size_t num_values = 1000000;
size_t mod_to = 1000;
std::vector<size_t> indexes(num_values);
for (size_t i = 0; i < num_values; ++i)
{
String str = toString(i % mod_to);
column_string->insertData(str.data(), str.size());
}
column_unique_unique_deserialize_from_arena_impl(*column_string, *data_type);
}
TEST(column_unique, column_unique_unique_deserialize_from_arena_Nullable_String_Test)
{
auto data_type = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
auto column_string = ColumnString::create();
auto null_mask = ColumnUInt8::create();
size_t num_values = 1000000;
size_t mod_to = 1000;
std::vector<size_t> indexes(num_values);
for (size_t i = 0; i < num_values; ++i)
{
String str = toString(i % mod_to);
column_string->insertData(str.data(), str.size());
null_mask->insertValue(i % 3 ? 1 : 0);
}
auto column = ColumnNullable::create(std::move(column_string), std::move(null_mask));
column_unique_unique_deserialize_from_arena_impl(*column, *data_type);
}

View File

@ -61,8 +61,6 @@
/// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096
#define DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS 7500
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
/// Maximum namber of http-connections between two endpoints

View File

@ -48,7 +48,7 @@ inline bool isLocal(const Cluster::Address & address, const Poco::Net::SocketAdd
/// Implementation of Cluster::Address class
Cluster::Address::Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix)
Cluster::Address::Address(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
UInt16 clickhouse_port = static_cast<UInt16>(config.getInt("tcp_port", 0));
@ -125,7 +125,7 @@ String Cluster::Address::toStringFull() const
/// Implementation of Clusters class
Clusters::Clusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
Clusters::Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
{
updateClusters(config, settings, config_name);
}
@ -147,7 +147,7 @@ void Clusters::setCluster(const String & cluster_name, const std::shared_ptr<Clu
}
void Clusters::updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
void Clusters::updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_name, config_keys);
@ -174,7 +174,7 @@ Clusters::Impl Clusters::getContainer() const
/// Implementation of `Cluster` class
Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name)
Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(cluster_name, config_keys);

View File

@ -16,7 +16,7 @@ namespace DB
class Cluster
{
public:
Cluster(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name);
Cluster(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name);
/// Construct a cluster by the names of shards and replicas.
/// Local are treated as well as remote ones if treat_local_as_remote is true.
@ -66,7 +66,7 @@ public:
Protocol::Secure secure = Protocol::Secure::Disable;
Address() = default;
Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix);
Address(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
Address(const String & host_port_, const String & user_, const String & password_, UInt16 clickhouse_port);
/// Returns 'escaped_host_name:port'
@ -178,7 +178,7 @@ using ClusterPtr = std::shared_ptr<Cluster>;
class Clusters
{
public:
Clusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers");
Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers");
Clusters(const Clusters &) = delete;
Clusters & operator=(const Clusters &) = delete;
@ -186,7 +186,7 @@ public:
ClusterPtr getCluster(const std::string & cluster_name) const;
void setCluster(const String & cluster_name, const ClusterPtr & cluster);
void updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name);
void updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name);
public:
using Impl = std::map<String, ClusterPtr>;

View File

@ -157,10 +157,10 @@ struct ContextShared
public:
size_t operator()(const Context::SessionKey & key) const
{
size_t seed = 0;
boost::hash_combine(seed, key.first);
boost::hash_combine(seed, key.second);
return seed;
SipHash hash;
hash.update(key.first);
hash.update(key.second);
return hash.get64();
}
};
@ -549,7 +549,7 @@ void Context::setConfig(const ConfigurationPtr & config)
shared->config = config;
}
Poco::Util::AbstractConfiguration & Context::getConfigRef() const
const Poco::Util::AbstractConfiguration & Context::getConfigRef() const
{
auto lock = getLock();
return shared->config ? *shared->config : Poco::Util::Application::instance().config();
@ -1537,95 +1537,50 @@ Compiler & Context::getCompiler()
void Context::initializeSystemLogs()
{
auto lock = getLock();
system_logs = std::make_shared<SystemLogs>();
if (!global_context)
throw Exception("Logical error: no global context for system logs", ErrorCodes::LOGICAL_ERROR);
system_logs = std::make_shared<SystemLogs>(*global_context, getConfigRef());
}
QueryLog * Context::getQueryLog(bool create_if_not_exists)
QueryLog * Context::getQueryLog()
{
auto lock = getLock();
if (!system_logs)
if (!system_logs || !system_logs->query_log)
return nullptr;
if (!system_logs->query_log)
{
if (!create_if_not_exists)
return nullptr;
if (shared->shutdown_called)
throw Exception("Logical error: query log should be destroyed before tables shutdown", ErrorCodes::LOGICAL_ERROR);
if (!global_context)
throw Exception("Logical error: no global context for query log", ErrorCodes::LOGICAL_ERROR);
system_logs->query_log = createDefaultSystemLog<QueryLog>(*global_context, "system", "query_log", getConfigRef(), "query_log");
}
return system_logs->query_log.get();
}
QueryThreadLog * Context::getQueryThreadLog(bool create_if_not_exists)
QueryThreadLog * Context::getQueryThreadLog()
{
auto lock = getLock();
if (!system_logs)
if (!system_logs || !system_logs->query_thread_log)
return nullptr;
if (!system_logs->query_thread_log)
{
if (!create_if_not_exists)
return nullptr;
if (shared->shutdown_called)
throw Exception("Logical error: query log should be destroyed before tables shutdown", ErrorCodes::LOGICAL_ERROR);
if (!global_context)
throw Exception("Logical error: no global context for query thread log", ErrorCodes::LOGICAL_ERROR);
system_logs->query_thread_log = createDefaultSystemLog<QueryThreadLog>(
*global_context, "system", "query_thread_log", getConfigRef(), "query_thread_log");
}
return system_logs->query_thread_log.get();
}
PartLog * Context::getPartLog(const String & part_database, bool create_if_not_exists)
PartLog * Context::getPartLog(const String & part_database)
{
auto lock = getLock();
auto & config = getConfigRef();
if (!config.has("part_log"))
return nullptr;
/// System logs are shutting down.
if (!system_logs)
if (!system_logs || !system_logs->part_log)
return nullptr;
String database = config.getString("part_log.database", "system");
/// Will not log operations on system tables (including part_log itself).
/// It doesn't make sense and not allow to destruct PartLog correctly due to infinite logging and flushing,
/// and also make troubles on startup.
if (!part_database.empty() && part_database == database)
if (part_database == system_logs->part_log_database)
return nullptr;
if (!system_logs->part_log)
{
if (!create_if_not_exists)
return nullptr;
if (shared->shutdown_called)
throw Exception("Logical error: part log should be destroyed before tables shutdown", ErrorCodes::LOGICAL_ERROR);
if (!global_context)
throw Exception("Logical error: no global context for part log", ErrorCodes::LOGICAL_ERROR);
system_logs->part_log = createDefaultSystemLog<PartLog>(*global_context, "system", "part_log", getConfigRef(), "part_log");
}
return system_logs->part_log.get();
}

View File

@ -168,7 +168,7 @@ public:
/// Global application configuration settings.
void setConfig(const ConfigurationPtr & config);
Poco::Util::AbstractConfiguration & getConfigRef() const;
const Poco::Util::AbstractConfiguration & getConfigRef() const;
/** Take the list of users, quotas and configuration profiles from this config.
* The list of users is completely replaced.
@ -389,12 +389,12 @@ public:
void initializeSystemLogs();
/// Nullptr if the query log is not ready for this moment.
QueryLog * getQueryLog(bool create_if_not_exists = true);
QueryThreadLog * getQueryThreadLog(bool create_if_not_exists = true);
QueryLog * getQueryLog();
QueryThreadLog * getQueryThreadLog();
/// Returns an object used to log opertaions with parts if it possible.
/// Provide table name to make required cheks.
PartLog * getPartLog(const String & part_database, bool create_if_not_exists = true);
PartLog * getPartLog(const String & part_database);
const MergeTreeSettings & getMergeTreeSettings() const;

View File

@ -15,7 +15,7 @@ class ISecurityManager
public:
using UserPtr = std::shared_ptr<const User>;
virtual void loadFromConfig(Poco::Util::AbstractConfiguration & config) = 0;
virtual void loadFromConfig(const Poco::Util::AbstractConfiguration & config) = 0;
/// Find user and make authorize checks
virtual UserPtr authorizeAndGetUser(

View File

@ -206,9 +206,9 @@ BlockIO InterpreterSystemQuery::execute()
break;
case Type::FLUSH_LOGS:
executeCommandsAndThrowIfError(
[&] () { if (auto query_log = context.getQueryLog(false)) query_log->flush(); },
[&] () { if (auto part_log = context.getPartLog("", false)) part_log->flush(); },
[&] () { if (auto query_thread_log = context.getQueryThreadLog(false)) query_thread_log->flush(); }
[&] () { if (auto query_log = context.getQueryLog()) query_log->flush(); },
[&] () { if (auto part_log = context.getPartLog("")) part_log->flush(); },
[&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(); }
);
break;
case Type::STOP_LISTEN_QUERIES:

View File

@ -23,7 +23,7 @@ namespace ErrorCodes
template <typename Counter>
void QuotaValues<Counter>::initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
void QuotaValues<Counter>::initFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config)
{
queries = config.getUInt64(config_elem + ".queries", 0);
errors = config.getUInt64(config_elem + ".errors", 0);
@ -34,11 +34,12 @@ void QuotaValues<Counter>::initFromConfig(const String & config_elem, Poco::Util
execution_time_usec = config.getUInt64(config_elem + ".execution_time", 0) * 1000000ULL;
}
template void QuotaValues<size_t>::initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config);
template void QuotaValues<std::atomic<size_t>>::initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config);
template void QuotaValues<size_t>::initFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config);
template void QuotaValues<std::atomic<size_t>>::initFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config);
void QuotaForInterval::initFromConfig(const String & config_elem, time_t duration_, bool randomize_, time_t offset_, Poco::Util::AbstractConfiguration & config)
void QuotaForInterval::initFromConfig(
const String & config_elem, time_t duration_, bool randomize_, time_t offset_, const Poco::Util::AbstractConfiguration & config)
{
rounded_time.store(0, std::memory_order_relaxed);
duration = duration_;
@ -160,7 +161,7 @@ void QuotaForInterval::check(
}
void QuotaForIntervals::initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config, pcg64 & rng)
void QuotaForIntervals::initFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, pcg64 & rng)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_elem, config_keys);
@ -251,7 +252,7 @@ String QuotaForIntervals::toString() const
}
void Quota::loadFromConfig(const String & config_elem, const String & name_, Poco::Util::AbstractConfiguration & config, pcg64 & rng)
void Quota::loadFromConfig(const String & config_elem, const String & name_, const Poco::Util::AbstractConfiguration & config, pcg64 & rng)
{
name = name_;
@ -307,7 +308,7 @@ QuotaForIntervalsPtr Quota::get(const String & quota_key, const String & user_na
}
void Quotas::loadFromConfig(Poco::Util::AbstractConfiguration & config)
void Quotas::loadFromConfig(const Poco::Util::AbstractConfiguration & config)
{
pcg64 rng;

View File

@ -63,7 +63,7 @@ struct QuotaValues
tuple() = std::make_tuple(0, 0, 0, 0, 0, 0, 0);
}
void initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config);
void initFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config);
bool operator== (const QuotaValues & rhs) const
{
@ -109,7 +109,7 @@ struct QuotaForInterval
QuotaForInterval() = default;
QuotaForInterval(time_t duration_) : duration(duration_) {}
void initFromConfig(const String & config_elem, time_t duration_, bool randomize_, time_t offset_, Poco::Util::AbstractConfiguration & config);
void initFromConfig(const String & config_elem, time_t duration_, bool randomize_, time_t offset_, const Poco::Util::AbstractConfiguration & config);
/// Increase current value.
void addQuery() noexcept;
@ -191,7 +191,7 @@ public:
return cont.empty();
}
void initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config, pcg64 & rng);
void initFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, pcg64 & rng);
/// Set maximum values (limits) from passed argument.
/// Remove intervals that does not exist in argument. Add intervals from argument, that we don't have.
@ -241,7 +241,7 @@ struct Quota
bool keyed_by_ip = false;
void loadFromConfig(const String & config_elem, const String & name_, Poco::Util::AbstractConfiguration & config, pcg64 & rng);
void loadFromConfig(const String & config_elem, const String & name_, const Poco::Util::AbstractConfiguration & config, pcg64 & rng);
QuotaForIntervalsPtr get(const String & quota_key, const String & user_name, const Poco::Net::IPAddress & ip);
};
@ -254,7 +254,7 @@ private:
Container cont;
public:
void loadFromConfig(Poco::Util::AbstractConfiguration & config);
void loadFromConfig(const Poco::Util::AbstractConfiguration & config);
QuotaForIntervalsPtr get(const String & name, const String & quota_key,
const String & user_name, const Poco::Net::IPAddress & ip);
};

View File

@ -29,7 +29,7 @@ namespace ErrorCodes
using UserPtr = SecurityManager::UserPtr;
void SecurityManager::loadFromConfig(Poco::Util::AbstractConfiguration & config)
void SecurityManager::loadFromConfig(const Poco::Util::AbstractConfiguration & config)
{
Container new_users;

View File

@ -17,7 +17,7 @@ private:
Container users;
public:
void loadFromConfig(Poco::Util::AbstractConfiguration & config) override;
void loadFromConfig(const Poco::Util::AbstractConfiguration & config) override;
UserPtr authorizeAndGetUser(
const String & user_name,

View File

@ -105,7 +105,7 @@ bool Settings::tryGet(const String & name, String & value) const
/** Set the settings from the profile (in the server configuration, many settings can be listed in one profile).
* The profile can also be set using the `set` functions, like the `profile` setting.
*/
void Settings::setProfile(const String & profile_name, Poco::Util::AbstractConfiguration & config)
void Settings::setProfile(const String & profile_name, const Poco::Util::AbstractConfiguration & config)
{
String elem = "profiles." + profile_name;

View File

@ -188,7 +188,7 @@ struct Settings
M(SettingBool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.") \
M(SettingUInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.") \
M(SettingInt64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. Negative value means infinite.") \
M(SettingMilliseconds, stream_flush_interval_ms, DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS, "Timeout for flushing data from streaming storages.") \
M(SettingMilliseconds, stream_flush_interval_ms, 7500, "Timeout for flushing data from streaming storages.") \
M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") \
M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.") \
M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \
@ -322,7 +322,7 @@ struct Settings
/** Set multiple settings from "profile" (in server configuration file (users.xml), profiles contain groups of multiple settings).
* The profile can also be set using the `set` functions, like the profile setting.
*/
void setProfile(const String & profile_name, Poco::Util::AbstractConfiguration & config);
void setProfile(const String & profile_name, const Poco::Util::AbstractConfiguration & config);
/// Load settings from configuration file, at "path" prefix in configuration.
void loadSettingsFromConfig(const String & path, const Poco::Util::AbstractConfiguration & config);

View File

@ -3,10 +3,22 @@
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/PartLog.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config)
{
query_log = createDefaultSystemLog<QueryLog>(global_context, "system", "query_log", config, "query_log");
query_thread_log = createDefaultSystemLog<QueryThreadLog>(global_context, "system", "query_thread_log", config, "query_thread_log");
part_log = createDefaultSystemLog<PartLog>(global_context, "system", "part_log", config, "part_log");
part_log_database = config.getString("part_log.database", "system");
}
SystemLogs::~SystemLogs() = default;
}

View File

@ -63,11 +63,14 @@ class PartLog;
/// because SystemLog destruction makes insert query while flushing data into underlying tables
struct SystemLogs
{
SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config);
~SystemLogs();
std::unique_ptr<QueryLog> query_log; /// Used to log queries.
std::unique_ptr<QueryThreadLog> query_thread_log; /// Used to log query threads.
std::unique_ptr<PartLog> part_log; /// Used to log operations with parts
String part_log_database;
};
@ -372,23 +375,25 @@ void SystemLog<LogElement>::prepareTable()
is_prepared = true;
}
/// Creates a system log with MergeTree engines using parameters from config
/// Creates a system log with MergeTree engine using parameters from config
template<typename TSystemLog>
std::unique_ptr<TSystemLog> createDefaultSystemLog(
Context & context_,
const String & default_database_name,
const String & default_table_name,
Poco::Util::AbstractConfiguration & config,
const String & config_prefix)
Context & context,
const String & default_database_name,
const String & default_table_name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix)
{
String database = config.getString(config_prefix + ".database", default_database_name);
String table = config.getString(config_prefix + ".table", default_table_name);
static constexpr size_t DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS = 7500;
String database = config.getString(config_prefix + ".database", default_database_name);
String table = config.getString(config_prefix + ".table", default_table_name);
String partition_by = config.getString(config_prefix + ".partition_by", "toYYYYMM(event_date)");
String engine = "ENGINE = MergeTree PARTITION BY (" + partition_by + ") ORDER BY (event_date, event_time) SETTINGS index_granularity = 1024";
size_t flush_interval_milliseconds = config.getUInt64("query_log.flush_interval_milliseconds", DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS);
size_t flush_interval_milliseconds = config.getUInt64(config_prefix + ".flush_interval_milliseconds", DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS);
return std::make_unique<TSystemLog>(context_, database, table, engine, flush_interval_milliseconds);
return std::make_unique<TSystemLog>(context, database, table, engine, flush_interval_milliseconds);
}

View File

@ -252,7 +252,7 @@ bool AddressPatterns::contains(const Poco::Net::IPAddress & addr) const
return false;
}
void AddressPatterns::addFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
void AddressPatterns::addFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_elem, config_keys);
@ -276,7 +276,7 @@ void AddressPatterns::addFromConfig(const String & config_elem, Poco::Util::Abst
}
User::User(const String & name_, const String & config_elem, Poco::Util::AbstractConfiguration & config)
User::User(const String & name_, const String & config_elem, const Poco::Util::AbstractConfiguration & config)
: name(name_)
{
bool has_password = config.has(config_elem + ".password");

View File

@ -42,7 +42,7 @@ private:
public:
bool contains(const Poco::Net::IPAddress & addr) const;
void addFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config);
void addFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config);
};
@ -65,7 +65,7 @@ struct User
using DatabaseSet = std::unordered_set<std::string>;
DatabaseSet databases;
User(const String & name_, const String & config_elem, Poco::Util::AbstractConfiguration & config);
User(const String & name_, const String & config_elem, const Poco::Util::AbstractConfiguration & config);
};

View File

@ -58,7 +58,7 @@ private:
throw Exception("Unknown compression method " + name, ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
}
Element(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
Element(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
min_part_size = config.getUInt64(config_prefix + ".min_part_size", 0);
min_part_size_ratio = config.getDouble(config_prefix + ".min_part_size_ratio", 0);
@ -80,7 +80,7 @@ private:
public:
CompressionSettingsSelector() {} /// Always returns the default method.
CompressionSettingsSelector(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
CompressionSettingsSelector(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);

View File

@ -23,9 +23,14 @@ namespace CurrentMetrics
namespace DB
{
static constexpr double thread_sleep_seconds = 10;
static constexpr double thread_sleep_seconds_random_part = 1.0;
constexpr double BackgroundProcessingPool::sleep_seconds;
constexpr double BackgroundProcessingPool::sleep_seconds_random_part;
/// For exponential backoff.
static constexpr double task_sleep_seconds_when_no_work_min = 10;
static constexpr double task_sleep_seconds_when_no_work_max = 600;
static constexpr double task_sleep_seconds_when_no_work_multiplier = 1.1;
static constexpr double task_sleep_seconds_when_no_work_random_part = 1.0;
void BackgroundProcessingPoolTaskInfo::wake()
@ -137,7 +142,7 @@ void BackgroundProcessingPool::threadFunction()
CurrentThread::getMemoryTracker().setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool);
pcg64 rng(randomSeed());
std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, thread_sleep_seconds_random_part)(rng)));
while (!shutdown)
{
@ -172,8 +177,8 @@ void BackgroundProcessingPool::threadFunction()
{
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock,
std::chrono::duration<double>(sleep_seconds
+ std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
std::chrono::duration<double>(thread_sleep_seconds
+ std::uniform_real_distribution<double>(0, thread_sleep_seconds_random_part)(rng)));
continue;
}
@ -183,7 +188,7 @@ void BackgroundProcessingPool::threadFunction()
{
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock, std::chrono::microseconds(
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, sleep_seconds_random_part * 1000000)(rng)));
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, thread_sleep_seconds_random_part * 1000000)(rng)));
}
std::shared_lock rlock(task->rwlock);
@ -205,16 +210,27 @@ void BackgroundProcessingPool::threadFunction()
if (shutdown)
break;
/// If task has done work, it could be executed again immediately.
/// If not, add delay before next run.
Poco::Timestamp next_time_to_execute = Poco::Timestamp() + (done_work ? 0 : sleep_seconds * 1000000);
{
std::unique_lock lock(tasks_mutex);
if (task->removed)
continue;
if (done_work)
task->count_no_work_done = 0;
else
++task->count_no_work_done;
/// If task has done work, it could be executed again immediately.
/// If not, add delay before next run.
Poco::Timestamp next_time_to_execute; /// current time
if (!done_work)
next_time_to_execute += 1000000 * (std::min(
task_sleep_seconds_when_no_work_max,
task_sleep_seconds_when_no_work_min * std::pow(task_sleep_seconds_when_no_work_multiplier, task->count_no_work_done))
+ std::uniform_real_distribution<double>(0, task_sleep_seconds_when_no_work_random_part)(rng));
tasks.erase(task->iterator);
task->iterator = tasks.emplace(next_time_to_execute, task);
}

View File

@ -55,8 +55,6 @@ protected:
using Threads = std::vector<std::thread>;
const size_t size;
static constexpr double sleep_seconds = 10;
static constexpr double sleep_seconds_random_part = 1.0;
Tasks tasks; /// Ordered in priority.
std::mutex tasks_mutex;
@ -95,6 +93,9 @@ protected:
std::atomic<bool> removed {false};
std::multimap<Poco::Timestamp, std::shared_ptr<BackgroundProcessingPoolTaskInfo>>::iterator iterator;
/// For exponential backoff.
size_t count_no_work_done = 0;
};
}

View File

@ -68,7 +68,7 @@ public:
}
/// Change amount of reserved space. When new_size is greater than before, availability of free space is not checked.
void update(size_t new_size)
void update(UInt64 new_size)
{
std::lock_guard<std::mutex> lock(DiskSpaceMonitor::mutex);
DiskSpaceMonitor::reserved_bytes -= size;
@ -76,12 +76,12 @@ public:
DiskSpaceMonitor::reserved_bytes += size;
}
size_t getSize() const
UInt64 getSize() const
{
return size;
}
Reservation(size_t size_)
Reservation(UInt64 size_)
: size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size)
{
std::lock_guard<std::mutex> lock(DiskSpaceMonitor::mutex);
@ -90,23 +90,23 @@ public:
}
private:
size_t size;
UInt64 size;
CurrentMetrics::Increment metric_increment;
};
using ReservationPtr = std::unique_ptr<Reservation>;
static size_t getUnreservedFreeSpace(const std::string & path)
static UInt64 getUnreservedFreeSpace(const std::string & path)
{
struct statvfs fs;
if (statvfs(path.c_str(), &fs) != 0)
throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS);
size_t res = fs.f_bfree * fs.f_bsize;
UInt64 res = fs.f_bfree * fs.f_bsize;
/// Heuristic by Michael Kolupaev: reserve 30 MB more, because statvfs shows few megabytes more space than df.
res -= std::min(res, static_cast<size_t>(30 * (1ul << 20)));
res -= std::min(res, static_cast<UInt64>(30 * (1ul << 20)));
std::lock_guard<std::mutex> lock(mutex);
@ -118,22 +118,22 @@ public:
return res;
}
static size_t getReservedSpace()
static UInt64 getReservedSpace()
{
std::lock_guard<std::mutex> lock(mutex);
return reserved_bytes;
}
static size_t getReservationCount()
static UInt64 getReservationCount()
{
std::lock_guard<std::mutex> lock(mutex);
return reservation_count;
}
/// If not enough (approximately) space, throw an exception.
static ReservationPtr reserve(const std::string & path, size_t size)
static ReservationPtr reserve(const std::string & path, UInt64 size)
{
size_t free_bytes = getUnreservedFreeSpace(path);
UInt64 free_bytes = getUnreservedFreeSpace(path);
if (free_bytes < size)
throw Exception("Not enough free disk space to reserve: " + formatReadableSizeWithBinarySuffix(free_bytes) + " available, "
+ formatReadableSizeWithBinarySuffix(size) + " requested", ErrorCodes::NOT_ENOUGH_SPACE);
@ -141,8 +141,8 @@ public:
}
private:
static size_t reserved_bytes;
static size_t reservation_count;
static UInt64 reserved_bytes;
static UInt64 reservation_count;
static std::mutex mutex;
};

View File

@ -1470,7 +1470,7 @@ void MergeTreeData::AlterDataPartTransaction::commit()
file.remove();
}
mutable_part.bytes_on_disk = MergeTreeData::DataPart::calculateTotalSizeOnDisk(path);
mutable_part.bytes_on_disk = new_checksums.getTotalSizeOnDisk();
/// TODO: we can skip resetting caches when the column is added.
data_part->storage.context.dropCaches();

View File

@ -494,8 +494,6 @@ void MergeTreeDataPart::loadIndex()
index.assign(std::make_move_iterator(loaded_index.begin()), std::make_move_iterator(loaded_index.end()));
}
bytes_on_disk = calculateTotalSizeOnDisk(getFullPath());
}
void MergeTreeDataPart::loadPartitionAndMinMaxIndex()
@ -529,16 +527,25 @@ void MergeTreeDataPart::loadPartitionAndMinMaxIndex()
void MergeTreeDataPart::loadChecksums(bool require)
{
String path = getFullPath() + "checksums.txt";
if (!Poco::File(path).exists())
Poco::File checksums_file(path);
if (checksums_file.exists())
{
ReadBufferFromFile file = openForReading(path);
if (checksums.read(file))
{
assertEOF(file);
bytes_on_disk = checksums.getTotalSizeOnDisk();
}
else
bytes_on_disk = calculateTotalSizeOnDisk(getFullPath());
}
else
{
if (require)
throw Exception("No checksums.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
return;
bytes_on_disk = calculateTotalSizeOnDisk(getFullPath());
}
ReadBufferFromFile file = openForReading(path);
if (checksums.read(file))
assertEOF(file);
}
void MergeTreeDataPart::loadRowsCount()

View File

@ -94,6 +94,7 @@ struct MergeTreeDataPart
size_t marks_count = 0;
std::atomic<UInt64> bytes_on_disk {0}; /// 0 - if not counted;
/// Is used from several threads without locks (it is changed with ALTER).
/// May not contain size of checksums.txt and columns.txt
time_t modification_time = 0;
/// When the part is removed from the working set. Changes once.
mutable std::atomic<time_t> remove_time { std::numeric_limits<time_t>::max() };

View File

@ -87,6 +87,14 @@ void MergeTreeDataPartChecksums::checkSizes(const String & path) const
}
}
UInt64 MergeTreeDataPartChecksums::getTotalSizeOnDisk() const
{
UInt64 res = 0;
for (const auto & it : files)
res += it.second.file_size;
return res;
}
bool MergeTreeDataPartChecksums::read(ReadBuffer & in, size_t format_version)
{
switch (format_version)

View File

@ -84,6 +84,8 @@ struct MergeTreeDataPartChecksums
String getSerializedString() const;
static MergeTreeDataPartChecksums deserializeFrom(const String & s);
UInt64 getTotalSizeOnDisk() const;
};

View File

@ -12,7 +12,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
void MergeTreeSettings::loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
void MergeTreeSettings::loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config)
{
if (!config.has(config_elem))
return;

View File

@ -167,7 +167,7 @@ struct MergeTreeSettings
#undef DECLARE
public:
void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config);
void loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config);
/// NOTE: will rewrite the AST to add immutable settings.
void loadFromQuery(ASTStorage & storage_def);

View File

@ -378,7 +378,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->columns = *total_column_list;
new_part->index.assign(std::make_move_iterator(index_columns.begin()), std::make_move_iterator(index_columns.end()));
new_part->checksums = checksums;
new_part->bytes_on_disk = MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_part->getFullPath());
new_part->bytes_on_disk = checksums.getTotalSizeOnDisk();
}
void MergedBlockOutputStream::init()

View File

@ -424,7 +424,7 @@ bool StorageMergeTree::merge(
}
else
{
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
UInt64 disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
}

View File

@ -2948,7 +2948,6 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
/// (merge_selecting_thread or OPTIMIZE queries) could assign new merges.
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
auto zookeeper = getZooKeeper();
ReplicatedMergeTreeMergePredicate can_merge = queue.getMergePredicate(zookeeper);
@ -2967,6 +2966,8 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
for (const MergeTreeData::DataPartPtr & part : data_parts)
partition_ids.emplace(part->info.partition_id);
UInt64 disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
for (const String & partition_id : partition_ids)
{
MergeTreeDataMergerMutator::FuturePart future_merged_part;
@ -2989,6 +2990,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
}
else
{
UInt64 disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
String partition_id = data.getPartitionIDFromQuery(partition, context);
selected = merger_mutator.selectAllPartsToMergeWithinPartition(
future_merged_part, disk_space, can_merge, partition_id, final, &disable_reason);

View File

@ -1,93 +0,0 @@
#include <Storages/getStructureOfRemoteTable.h>
#include <Storages/StorageDistributed.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/getClusterName.h>
#include <Common/SipHash.h>
#include <Common/typeid_cast.h>
#include <TableFunctions/TableFunctionShardByHash.h>
#include <TableFunctions/TableFunctionFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
StoragePtr TableFunctionShardByHash::executeImpl(const ASTPtr & ast_function, const Context & context) const
{
ASTs & args_func = typeid_cast<ASTFunction &>(*ast_function).children;
const char * err = "Table function 'shardByHash' requires 4 parameters: "
"cluster name, key string to hash, name of remote database, name of remote table.";
if (args_func.size() != 1)
throw Exception(err, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 4)
throw Exception(err, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String cluster_name;
String key;
String remote_database;
String remote_table;
auto getStringLiteral = [](const IAST & node, const char * description)
{
const ASTLiteral * lit = typeid_cast<const ASTLiteral *>(&node);
if (!lit)
throw Exception(description + String(" must be string literal (in single quotes)."), ErrorCodes::BAD_ARGUMENTS);
if (lit->value.getType() != Field::Types::String)
throw Exception(description + String(" must be string literal (in single quotes)."), ErrorCodes::BAD_ARGUMENTS);
return safeGet<const String &>(lit->value);
};
cluster_name = getClusterName(*args[0]);
key = getStringLiteral(*args[1], "Key to hash");
args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(args[2], context);
args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(args[3], context);
remote_database = static_cast<const ASTLiteral &>(*args[2]).value.safeGet<String>();
remote_table = static_cast<const ASTLiteral &>(*args[3]).value.safeGet<String>();
/// Similar to other TableFunctions.
for (auto & arg : args)
if (ASTIdentifier * id = typeid_cast<ASTIdentifier *>(arg.get()))
id->setSpecial();
auto cluster = context.getCluster(cluster_name);
size_t shard_index = sipHash64(key) % cluster->getShardCount();
std::shared_ptr<Cluster> shard(cluster->getClusterWithSingleShard(shard_index).release());
auto res = StorageDistributed::createWithOwnCluster(
getName(),
getStructureOfRemoteTable(*shard, remote_database, remote_table, context),
remote_database,
remote_table,
shard,
context);
res->startup();
return res;
}
void registerTableFunctionShardByHash(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionShardByHash>();
}
}

View File

@ -1,23 +0,0 @@
#pragma once
#include <TableFunctions/ITableFunction.h>
namespace DB
{
/* shardByHash(cluster, 'key', db, table) - creates a temporary StorageDistributed,
* using the cluster `cluster`, and selecting from it only one shard by hashing the string key.
*
* Similarly to the `remote` function, to get the table structure, a DESC TABLE request is made to the remote server.
*/
class TableFunctionShardByHash : public ITableFunction
{
public:
static constexpr auto name = "shardByHash";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
};
}

View File

@ -8,7 +8,6 @@ namespace DB
void registerTableFunctionMerge(TableFunctionFactory & factory);
void registerTableFunctionRemote(TableFunctionFactory & factory);
void registerTableFunctionShardByHash(TableFunctionFactory & factory);
void registerTableFunctionNumbers(TableFunctionFactory & factory);
void registerTableFunctionCatBoostPool(TableFunctionFactory & factory);
void registerTableFunctionFile(TableFunctionFactory & factory);
@ -35,7 +34,6 @@ void registerTableFunctions()
registerTableFunctionMerge(factory);
registerTableFunctionRemote(factory);
registerTableFunctionShardByHash(factory);
registerTableFunctionNumbers(factory);
registerTableFunctionCatBoostPool(factory);
registerTableFunctionFile(factory);

4
debian/changelog vendored
View File

@ -1,5 +1,5 @@
clickhouse (18.15.0) unstable; urgency=low
clickhouse (18.16.0) unstable; urgency=low
* Modified source code
-- <root@yandex-team.ru> Fri, 14 Dec 2018 16:47:44 +0300
-- <root@yandex-team.ru> Fri, 14 Dec 2018 20:26:45 +0300

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=18.15.0
ARG version=18.16.0
RUN apt-get update \
&& apt-get install --yes --no-install-recommends \

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=18.15.0
ARG version=18.16.0
ARG gosu_ver=1.10
RUN apt-get update \

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=18.15.0
ARG version=18.16.0
RUN apt-get update && \
apt-get install -y apt-transport-https dirmngr && \