Merge branch 'master' of github.com:yandex/ClickHouse into ubsan-fixes-3

This commit is contained in:
Alexey Milovidov 2018-12-26 22:38:11 +03:00
commit 7509db544c
12 changed files with 43 additions and 96 deletions

View File

@ -20,23 +20,21 @@ ZooKeeperNodeCache::ZNode ZooKeeperNodeCache::get(const std::string & path, Even
ZooKeeperNodeCache::ZNode ZooKeeperNodeCache::get(const std::string & path, Coordination::WatchCallback caller_watch_callback)
{
zkutil::ZooKeeperPtr zookeeper;
std::unordered_set<std::string> invalidated_paths;
{
std::lock_guard<std::mutex> lock(context->mutex);
if (!context->zookeeper)
if (context->all_paths_invalidated)
{
/// Possibly, there was a previous session and it has expired. Clear the cache.
path_to_cached_znode.clear();
context->zookeeper = get_zookeeper();
context->all_paths_invalidated = false;
}
zookeeper = context->zookeeper;
invalidated_paths.swap(context->invalidated_paths);
}
zkutil::ZooKeeperPtr zookeeper = get_zookeeper();
if (!zookeeper)
throw DB::Exception("Could not get znode: `" + path + "'. ZooKeeper not configured.", DB::ErrorCodes::NO_ZOOKEEPER);
@ -65,8 +63,8 @@ ZooKeeperNodeCache::ZNode ZooKeeperNodeCache::get(const std::string & path, Coor
changed = owned_context->invalidated_paths.emplace(response.path).second;
else if (response.state == Coordination::EXPIRED_SESSION)
{
owned_context->zookeeper = nullptr;
owned_context->invalidated_paths.clear();
owned_context->all_paths_invalidated = true;
changed = true;
}
}

View File

@ -53,8 +53,8 @@ private:
struct Context
{
std::mutex mutex;
zkutil::ZooKeeperPtr zookeeper;
std::unordered_set<std::string> invalidated_paths;
bool all_paths_invalidated = false;
};
std::shared_ptr<Context> context;

View File

@ -61,7 +61,7 @@ DNSCacheUpdater::DNSCacheUpdater(Context & context_)
task_handle = pool.addTask([this] () { return run(); });
}
bool DNSCacheUpdater::run()
BackgroundProcessingPoolTaskResult DNSCacheUpdater::run()
{
/// TODO: Ensusre that we get global counter (not thread local)
auto num_current_network_exceptions = ProfileEvents::global_counters[ProfileEvents::NetworkErrors].load(std::memory_order_relaxed);
@ -79,20 +79,20 @@ bool DNSCacheUpdater::run()
last_num_network_erros = num_current_network_exceptions;
last_update_time = time(nullptr);
return true;
return BackgroundProcessingPoolTaskResult::SUCCESS;
}
catch (...)
{
/// Do not increment ProfileEvents::NetworkErrors twice
if (isNetworkError())
return false;
return BackgroundProcessingPoolTaskResult::ERROR;
throw;
}
}
/// According to BackgroundProcessingPool logic, if task has done work, it could be executed again immediately.
return false;
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
}
DNSCacheUpdater::~DNSCacheUpdater()

View File

@ -11,6 +11,7 @@ namespace DB
class Context;
class BackgroundProcessingPool;
class BackgroundProcessingPoolTaskInfo;
enum class BackgroundProcessingPoolTaskResult;
/// Add a task to BackgroundProcessingPool that watch for ProfileEvents::NetworkErrors and updates DNS cache if it has increased
@ -25,7 +26,7 @@ public:
static bool incrementNetworkErrorEventsIfNeeded();
private:
bool run();
BackgroundProcessingPoolTaskResult run();
Context & context;
BackgroundProcessingPool & pool;

View File

@ -337,56 +337,6 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block &
}
}
bool ExpressionAnalyzer::isThereArrayJoin(const ASTPtr & ast)
{
if (typeid_cast<ASTIdentifier *>(ast.get()))
{
return false;
}
else if (ASTFunction * node = typeid_cast<ASTFunction *>(ast.get()))
{
if (node->name == "arrayJoin")
{
return true;
}
if (functionIsInOrGlobalInOperator(node->name))
{
return isThereArrayJoin(node->arguments->children.at(0));
}
if (node->name == "indexHint")
{
return false;
}
if (AggregateFunctionFactory::instance().isAggregateFunctionName(node->name))
{
return false;
}
for (auto & child : node->arguments->children)
{
if (isThereArrayJoin(child))
{
return true;
}
}
return false;
}
else if (typeid_cast<ASTLiteral *>(ast.get()))
{
return false;
}
else
{
for (auto & child : ast->children)
{
if (isThereArrayJoin(child))
{
return true;
}
}
return false;
}
}
void ExpressionAnalyzer::getRootActions(const ASTPtr & ast, bool no_subqueries, ExpressionActionsPtr & actions, bool only_consts)
{
@ -1124,9 +1074,4 @@ void ExpressionAnalyzer::collectUsedColumns()
}
Names ExpressionAnalyzer::getRequiredSourceColumns() const
{
return source_columns.getNames();
}
}

View File

@ -150,7 +150,7 @@ public:
/** Get a set of columns that are enough to read from the table to evaluate the expression.
* Columns added from another table by JOIN are not counted.
*/
Names getRequiredSourceColumns() const;
Names getRequiredSourceColumns() const { return source_columns.getNames(); }
/** These methods allow you to build a chain of transformations over a block, that receives values in the desired sections of the query.
*
@ -244,8 +244,6 @@ private:
void addJoinAction(ExpressionActionsPtr & actions, bool only_types) const;
bool isThereArrayJoin(const ASTPtr & ast);
/// If ast is ASTSelectQuery with JOIN, add actions for JOIN key columns.
void getActionsFromJoinKeys(const ASTTableJoin & table_join, bool no_subqueries, ExpressionActionsPtr & actions);

View File

@ -25,6 +25,7 @@ namespace DB
static constexpr double thread_sleep_seconds = 10;
static constexpr double thread_sleep_seconds_random_part = 1.0;
static constexpr double thread_sleep_seconds_if_nothing_to_do = 0.1;
/// For exponential backoff.
static constexpr double task_sleep_seconds_when_no_work_min = 10;
@ -146,7 +147,7 @@ void BackgroundProcessingPool::threadFunction()
while (!shutdown)
{
bool done_work = false;
TaskResult task_result = TaskResult::ERROR;
TaskHandle task;
try
@ -198,7 +199,7 @@ void BackgroundProcessingPool::threadFunction()
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundPoolTask};
done_work = task->function();
task_result = task->function();
}
}
catch (...)
@ -216,7 +217,7 @@ void BackgroundProcessingPool::threadFunction()
if (task->removed)
continue;
if (done_work)
if (task_result == TaskResult::SUCCESS)
task->count_no_work_done = 0;
else
++task->count_no_work_done;
@ -225,11 +226,13 @@ void BackgroundProcessingPool::threadFunction()
/// If not, add delay before next run.
Poco::Timestamp next_time_to_execute; /// current time
if (!done_work)
if (task_result == TaskResult::ERROR)
next_time_to_execute += 1000000 * (std::min(
task_sleep_seconds_when_no_work_max,
task_sleep_seconds_when_no_work_min * std::pow(task_sleep_seconds_when_no_work_multiplier, task->count_no_work_done))
+ std::uniform_real_distribution<double>(0, task_sleep_seconds_when_no_work_random_part)(rng));
else if (task_result == TaskResult::NOTHING_TO_DO)
next_time_to_execute += 1000000 * thread_sleep_seconds_if_nothing_to_do;
tasks.erase(task->iterator);
task->iterator = tasks.emplace(next_time_to_execute, task);

View File

@ -21,6 +21,12 @@ namespace DB
class BackgroundProcessingPool;
class BackgroundProcessingPoolTaskInfo;
enum class BackgroundProcessingPoolTaskResult
{
SUCCESS,
ERROR,
NOTHING_TO_DO,
};
/** Using a fixed number of threads, perform an arbitrary number of tasks in an infinite loop.
* In this case, one task can run simultaneously from different threads.
* Designed for tasks that perform continuous background work (for example, merge).
@ -31,11 +37,13 @@ class BackgroundProcessingPool
{
public:
/// Returns true, if some useful work was done. In that case, thread will not sleep before next run of this task.
using Task = std::function<bool()>;
using TaskResult = BackgroundProcessingPoolTaskResult;
using Task = std::function<TaskResult()>;
using TaskInfo = BackgroundProcessingPoolTaskInfo;
using TaskHandle = std::shared_ptr<TaskInfo>;
BackgroundProcessingPool(int size_);
size_t getNumberOfThreads() const

View File

@ -588,13 +588,13 @@ bool StorageMergeTree::tryMutatePart()
}
bool StorageMergeTree::backgroundTask()
BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask()
{
if (shutdown_called)
return false;
return BackgroundProcessingPoolTaskResult::ERROR;
if (merger_mutator.actions_blocker.isCancelled())
return false;
return BackgroundProcessingPoolTaskResult::ERROR;
try
{
@ -608,16 +608,19 @@ bool StorageMergeTree::backgroundTask()
///TODO: read deduplicate option from table config
if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
return true;
return BackgroundProcessingPoolTaskResult::SUCCESS;
return tryMutatePart();
if (tryMutatePart())
return BackgroundProcessingPoolTaskResult::SUCCESS;
else
return BackgroundProcessingPoolTaskResult::ERROR;
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::ABORTED)
{
LOG_INFO(log, e.message());
return false;
return BackgroundProcessingPoolTaskResult::ERROR;
}
throw;

View File

@ -137,7 +137,7 @@ private:
/// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true.
bool tryMutatePart();
bool backgroundTask();
BackgroundProcessingPoolTaskResult backgroundTask();
Int64 getCurrentMutationVersion(
const MergeTreeData::DataPartPtr & part,

View File

@ -2055,13 +2055,13 @@ void StorageReplicatedMergeTree::mutationsUpdatingTask()
}
bool StorageReplicatedMergeTree::queueTask()
BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
{
/// If replication queue is stopped exit immediately as we successfully executed the task
if (queue.actions_blocker.isCancelled())
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return true;
return BackgroundProcessingPoolTaskResult::SUCCESS;
}
/// This object will mark the element of the queue as running.
@ -2079,16 +2079,7 @@ bool StorageReplicatedMergeTree::queueTask()
LogEntryPtr & entry = selected.first;
if (!entry)
{
/// Nothing to do, we can sleep for some time, just not to
/// abuse background pool scheduling policy
std::this_thread::sleep_for(std::chrono::milliseconds(100));
/// If we return false, than background pool for this task
/// will accumulate exponential backoff and after empty replication queue
/// we will sleep for a long time
return true;
}
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
time_t prev_attempt_time = entry->last_attempt_time;
@ -2136,7 +2127,7 @@ bool StorageReplicatedMergeTree::queueTask()
bool need_sleep = !res && (entry->last_attempt_time - prev_attempt_time < 10);
/// If there was no exception, you do not need to sleep.
return !need_sleep;
return need_sleep ? BackgroundProcessingPoolTaskResult::ERROR : BackgroundProcessingPoolTaskResult::SUCCESS;
}

View File

@ -427,7 +427,7 @@ private:
/** Performs actions from the queue.
*/
bool queueTask();
BackgroundProcessingPoolTaskResult queueTask();
/// Postcondition:
/// either leader_election is fully initialized (node in ZK is created and the watching thread is launched)