Speedup partition check, add more preformance output. [#CLICKHOUSE-2]

Faster partition check.

Added more debug info.
This commit is contained in:
Vitaliy Lyudvichenko 2018-03-30 19:25:26 +03:00
parent 2ca7d486db
commit d25338582d
5 changed files with 105 additions and 26 deletions

View File

@ -1,6 +1,9 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Common/Stopwatch.h>
#include <common/logger_useful.h>
#include <iomanip>
namespace DB
@ -22,8 +25,29 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall
from.readPrefix();
to.writePrefix();
while (Block block = from.read())
size_t num_blocks = 0;
double total_blocks_time = 0;
size_t slowest_block_number = 0;
double slowest_block_time = 0;
while (true)
{
Stopwatch watch;
Block block = from.read();
double elapsed = watch.elapsedSeconds();
if (num_blocks == 0 || elapsed > slowest_block_time)
{
slowest_block_number = num_blocks;
slowest_block_time = elapsed;
}
total_blocks_time += elapsed;
++num_blocks;
if (!block)
break;
if (is_cancelled())
break;
@ -47,8 +71,28 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall
if (is_cancelled())
return;
from.readSuffix();
to.writeSuffix();
auto log = &Poco::Logger::get("copyData");
bool print_dbg = num_blocks > 2;
if (print_dbg)
{
LOG_DEBUG(log, "Read " << num_blocks << " blocks. It took " << std::fixed << total_blocks_time << " seconds, average "
<< std::fixed << total_blocks_time / num_blocks * 1000 << " ms, the slowest block #" << slowest_block_number
<< " was read for " << std::fixed << slowest_block_time * 1000 << " ms ");
}
{
Stopwatch watch;
to.writeSuffix();
if (num_blocks > 1)
LOG_DEBUG(log, "It took " << std::fixed << watch.elapsedSeconds() << " for writeSuffix()");
}
{
Stopwatch watch;
from.readSuffix();
if (num_blocks > 1)
LOG_DEBUG(log, "It took " << std::fixed << watch.elapsedSeconds() << " seconds for readSuffix()");
}
}

View File

@ -699,15 +699,19 @@ void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & c
if (config.has(prefix + "settings_push"))
settings_push.loadSettingsFromConfig(prefix + "settings_push", config);
/// Override important settings
settings_pull.load_balancing = LoadBalancing::NEAREST_HOSTNAME;
settings_pull.readonly = 1;
settings_pull.max_threads = 1;
settings_pull.max_block_size = settings_pull.max_block_size.changed ? settings_pull.max_block_size.value : 8192UL;
settings_pull.preferred_block_size_bytes = 0;
auto set_default_value = [] (auto && setting, auto && default_value)
{
setting = setting.changed ? setting.value : default_value;
};
settings_push.insert_distributed_timeout = 0;
/// Override important settings
settings_pull.readonly = 1;
settings_push.insert_distributed_sync = 1;
set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME);
set_default_value(settings_pull.max_threads, 1);
set_default_value(settings_pull.max_block_size, 8192UL);
set_default_value(settings_pull.preferred_block_size_bytes, 0);
set_default_value(settings_push.insert_distributed_timeout, 0);
}
@ -1097,22 +1101,27 @@ protected:
status_paths.emplace_back(task_shard_partition.getShardStatusPath());
}
zkutil::Stat stat;
std::vector<int64_t> zxid1, zxid2;
try
{
// Check that state is Finished and remember zxid
std::vector<zkutil::ZooKeeper::GetFuture> get_futures;
for (const String & path : status_paths)
get_futures.emplace_back(zookeeper->asyncGet(path));
// Check that state is Finished and remember zxid
for (auto & future : get_futures)
{
TaskStateWithOwner status = TaskStateWithOwner::fromString(zookeeper->get(path, &stat));
auto res = future.get();
TaskStateWithOwner status = TaskStateWithOwner::fromString(res.value);
if (status.state != TaskState::Finished)
{
LOG_INFO(log, "The task " << path << " is being rewritten by " << status.owner
<< ". Partition will be rechecked");
LOG_INFO(log, "The task " << res.value << " is being rewritten by " << status.owner << ". Partition will be rechecked");
return false;
}
zxid1.push_back(stat.pzxid);
zxid1.push_back(res.stat.pzxid);
}
// Check that partition is not dirty
@ -1122,11 +1131,15 @@ protected:
return false;
}
get_futures.clear();
for (const String & path : status_paths)
get_futures.emplace_back(zookeeper->asyncGet(path));
// Remember zxid of states again
for (const auto & path : status_paths)
for (auto & future : get_futures)
{
zookeeper->exists(path, &stat);
zxid2.push_back(stat.pzxid);
auto res = future.get();
zxid2.push_back(res.stat.pzxid);
}
}
catch (const zkutil::KeeperException & e)
@ -1664,7 +1677,7 @@ protected:
BlockIO io_select = InterpreterFactory::get(query_select_ast, context_select)->execute();
BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute();
input = std::make_shared<AsynchronousBlockInputStream>(io_select.in);
input = io_select.in;
output = io_insert.out;
}

View File

@ -28,6 +28,7 @@
#include <ext/scope_guard.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/DateTimeFormatter.h>
#include <future>
#include <condition_variable>
@ -353,35 +354,53 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
inserted_blocks += 1;
inserted_rows += block.rows();
last_block_finish_time = time(nullptr);
}
void DistributedBlockOutputStream::writeSuffix()
{
auto log_performance = [this] ()
{
double elapsed = watch.elapsedSeconds();
LOG_DEBUG(log, "It took " << std::fixed << std::setprecision(1) << elapsed << " sec. to insert " << inserted_blocks << " blocks"
<< ", " << std::fixed << std::setprecision(1) << inserted_rows / elapsed << " rows per second"
<< ". " << getCurrentStateDescription());
};
if (insert_sync && pool)
{
auto format_ts = [] (time_t ts) {
WriteBufferFromOwnString wb;
writeDateTimeText(ts, wb);
return wb.str();
};
LOG_DEBUG(log, "Writing suffix, the last block was at " << format_ts(last_block_finish_time));
finished_jobs_count = 0;
for (auto & shard_jobs : per_shard_jobs)
for (JobReplica & job : shard_jobs.replicas_jobs)
{
if (job.stream)
pool->schedule([&job] () { job.stream->writeSuffix(); });
{
pool->schedule([&job] () {
job.stream->writeSuffix();
});
}
}
try
{
pool->wait();
log_performance();
}
catch (Exception & exception)
{
log_performance();
exception.addMessage(getCurrentStateDescription());
throw;
}
double elapsed = watch.elapsedSeconds();
LOG_DEBUG(log, "It took " << std::fixed << std::setprecision(1) << elapsed << " sec. to insert " << inserted_blocks << " blocks"
<< ", " << std::fixed << std::setprecision(1) << inserted_rows / elapsed << " rows per second"
<< ". " << getCurrentStateDescription());
}
}

View File

@ -12,6 +12,7 @@
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
namespace Poco
{
class Logger;
@ -93,6 +94,7 @@ private:
std::optional<ThreadPool> pool;
ThrottlerPtr throttler;
String query_string;
time_t last_block_finish_time = 0;
struct JobReplica
{

View File

@ -6,6 +6,7 @@
<!-- Common setting for pull and push operations -->
<settings>
<connect_timeout>1</connect_timeout>
<max_block_size>2</max_block_size>
</settings>
<settings_pull>