Merge pull request #2053 from yandex/small-enhancements

Small enhancements
This commit is contained in:
alexey-milovidov 2018-03-15 22:10:29 +03:00 committed by GitHub
commit b9339e0ea1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 75 additions and 27 deletions

View File

@ -38,7 +38,7 @@ int main(int argc, char ** argv)
ops.emplace_back(std::make_shared<zkutil::Op::Remove>("/test/zk_expiration_test", -1));
zkutil::MultiTransactionInfo info;
zk.tryMultiUnsafe(ops, info);
zk.tryMultiNoThrow(ops, nullptr, &info);
std::cout << time(nullptr) - time0 << "s: " << zkutil::ZooKeeper::error2string(info.code) << std::endl;
try

View File

@ -20,6 +20,7 @@
#include <Common/Exception.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/getFQDNOrHostName.h>
#include <Common/isLocalAddress.h>
#include <Client/Connection.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cluster.h>
@ -37,7 +38,7 @@
#include <Common/escapeForFileName.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Storages/StorageDistributed.h>
#include <DataTypes/DataTypeString.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserQuery.h>
@ -46,20 +47,20 @@
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Databases/DatabaseMemory.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <Common/isLocalAddress.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <Functions/registerFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Server/StatusFile.h>
#include <Storages/registerStorages.h>
#include <Storages/StorageDistributed.h>
#include <Databases/DatabaseMemory.h>
#include <Server/StatusFile.h>
#include <Common/formatReadable.h>
#include <daemon/OwnPatternFormatter.h>
@ -697,7 +698,7 @@ void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & c
settings_pull.load_balancing = LoadBalancing::NEAREST_HOSTNAME;
settings_pull.readonly = 1;
settings_pull.max_threads = 1;
settings_pull.max_block_size = std::min(8192UL, settings_pull.max_block_size.value);
settings_pull.max_block_size = settings_pull.max_block_size.changed ? settings_pull.max_block_size.value : 8192UL;
settings_pull.preferred_block_size_bytes = 0;
settings_push.insert_distributed_timeout = 0;
@ -759,6 +760,7 @@ public:
/// Do not initialize tables, will make deferred initialization in process()
getZooKeeper()->createAncestors(getWorkersPathVersion() + "/");
getZooKeeper()->createAncestors(getWorkersPath() + "/");
}
@ -1011,32 +1013,65 @@ protected:
return task_cluster->task_zookeeper_path + "/task_active_workers";
}
String getWorkersPathVersion() const
{
return getWorkersPath() + "_version";
}
String getCurrentWorkerNodePath() const
{
return getWorkersPath() + "/" + host_id;
}
zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(const zkutil::ZooKeeperPtr & zookeeper,
const String & description)
const String & description, bool unprioritized)
{
std::chrono::milliseconds current_sleep_time = default_sleep_time;
static constexpr std::chrono::milliseconds max_sleep_time(30000); // 30 sec
if (unprioritized)
std::this_thread::sleep_for(current_sleep_time);
String workers_version_path = getWorkersPathVersion();
String workers_path = getWorkersPath();
String current_worker_path = getCurrentWorkerNodePath();
while (true)
{
zkutil::Stat stat;
zookeeper->get(getWorkersPath(), &stat);
zookeeper->get(workers_version_path, &stat);
auto version = stat.version;
zookeeper->get(workers_path, &stat);
if (static_cast<size_t>(stat.numChildren) >= task_cluster->max_workers)
{
LOG_DEBUG(log, "Too many workers (" << stat.numChildren << ", maximum " << task_cluster->max_workers << ")"
<< ". Postpone processing " << description);
std::this_thread::sleep_for(default_sleep_time);
updateConfigIfNeeded();
}
else
{
return std::make_shared<zkutil::EphemeralNodeHolder>(getCurrentWorkerNodePath(), *zookeeper, true, false, description);
zkutil::Ops ops;
ops.emplace_back(new zkutil::Op::SetData(workers_version_path, description, version));
ops.emplace_back(new zkutil::Op::Create(current_worker_path, description, zookeeper->getDefaultACL(), zkutil::CreateMode::Ephemeral));
auto code = zookeeper->tryMulti(ops);
if (code == ZOK || code == ZNODEEXISTS)
return std::make_shared<zkutil::EphemeralNodeHolder>(current_worker_path, *zookeeper, false, false, description);
if (code == ZBADVERSION)
{
LOG_DEBUG(log, "A concurrent worker has just been added, will check free worker slots again");
}
else
throw zkutil::KeeperException(code);
}
if (unprioritized)
current_sleep_time = std::min(max_sleep_time, current_sleep_time + default_sleep_time);
std::this_thread::sleep_for(current_sleep_time);
updateConfigIfNeeded();
}
}
@ -1444,9 +1479,8 @@ protected:
return parseQuery(p_query, query);
};
/// Load balancing
auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path);
auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path, task_shard.priority.is_remote);
LOG_DEBUG(log, "Processing " << current_task_status_path);
@ -1609,9 +1643,16 @@ protected:
Context context_insert = context;
context_insert.getSettingsRef() = task_cluster->settings_push;
BlockInputStreamPtr input;
BlockOutputStreamPtr output;
{
BlockIO io_select = InterpreterFactory::get(query_select_ast, context_select)->execute();
BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute();
input = std::make_shared<AsynchronousBlockInputStream>(io_select.in);
output = io_insert.out;
}
using ExistsFuture = zkutil::ZooKeeper::ExistsFuture;
auto future_is_dirty_checker = std::make_unique<ExistsFuture>(zookeeper->asyncExists(is_dirty_flag_path));
@ -1665,7 +1706,7 @@ protected:
};
/// Main work is here
copyData(*io_select.in, *io_insert.out, cancel_check, update_stats);
copyData(*input, *output, cancel_check, update_stats);
// Just in case
if (future_is_dirty_checker != nullptr)

View File

@ -337,7 +337,10 @@ void DistributedBlockOutputStream::writeSuffix()
throw;
}
LOG_DEBUG(log, getCurrentStateDescription());
double elapsed = watch.elapsedSeconds();
LOG_DEBUG(log, "It took " << std::fixed << std::setprecision(1) << elapsed << " sec. to insert " << blocks_inserted << " blocks"
<< " (average " << std::fixed << std::setprecision(1) << elapsed / blocks_inserted * 1000 << " ms. per block)"
<< ". " << getCurrentStateDescription());
}
}

View File

@ -330,12 +330,12 @@ static void extractMergingAndGatheringColumns(const NamesAndTypesList & all_colu
NamesAndTypesList & merging_columns, Names & merging_column_names
)
{
Names primary_key_columns_dup = primary_key_expressions->getRequiredColumns();
std::set<String> key_columns(primary_key_columns_dup.cbegin(), primary_key_columns_dup.cend());
Names primary_key_columns_vec = primary_key_expressions->getRequiredColumns();
std::set<String> key_columns(primary_key_columns_vec.cbegin(), primary_key_columns_vec.cend());
if (secondary_key_expressions)
{
Names secondary_key_columns_dup = secondary_key_expressions->getRequiredColumns();
key_columns.insert(secondary_key_columns_dup.begin(), secondary_key_columns_dup.end());
Names secondary_key_columns_vec = secondary_key_expressions->getRequiredColumns();
key_columns.insert(secondary_key_columns_vec.begin(), secondary_key_columns_vec.end());
}
/// Force sign column for Collapsing mode
@ -350,6 +350,10 @@ static void extractMergingAndGatheringColumns(const NamesAndTypesList & all_colu
if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
key_columns.emplace(merging_params.sign_column);
/// Force to merge at least one column in case of empty key
if (key_columns.empty())
key_columns.emplace(all_columns.front().name);
/// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns
for (auto & column : all_columns)

View File

@ -1,7 +1,7 @@
<?xml version="1.0"?>
<yandex>
<!-- How many simualteneous workers are posssible -->
<max_workers>4</max_workers>
<max_workers>3</max_workers>
<!-- Common setting for pull and push operations -->
<settings>

View File

@ -1,7 +1,7 @@
SELECT '*** MergeTree ***';
DROP TABLE IF EXISTS test.unsorted;
CREATE TABLE test.unsorted (x UInt32, y String) ENGINE MergeTree ORDER BY tuple();
CREATE TABLE test.unsorted (x UInt32, y String) ENGINE MergeTree ORDER BY tuple() SETTINGS vertical_merge_algorithm_min_rows_to_activate=0, vertical_merge_algorithm_min_columns_to_activate=0;
INSERT INTO test.unsorted VALUES (1, 'a'), (5, 'b');
INSERT INTO test.unsorted VALUES (2, 'c'), (4, 'd');