Fixed code review issues

This commit is contained in:
Maksim Kita 2021-03-06 21:45:51 +03:00
parent 2d8632e5a9
commit 6414da73bf
5 changed files with 125 additions and 112 deletions

View File

@ -15,12 +15,12 @@
* and have to be initialized on demand.
* Two main properties of pool are allocated objects size and borrowed objects size.
* Allocated objects size is size of objects that are currently allocated by the pool.
* Borrowed objects size is size of objects that are borrowed from clients.
* Borrowed objects size is size of objects that are borrowed by clients.
* If max_size == 0 then pool has unlimited size and objects will be allocated without limit.
*
* Pool provides following strategy for borrowing object:
* If max_size == 0 then pool has unlimited size and objects will be allocated without limit.
* 1. If pool has objects that can be increase borrowed objects size and return it.
* 1. If pool has objects that can be borrowed increase borrowed objects size and return it.
* 2. If pool allocatedObjectsSize is lower than max objects size or pool has unlimited size
* allocate new object, increase borrowed objects size and return it.
* 3. If pool is full wait on condition variable with or without timeout until some object
@ -41,7 +41,7 @@ public:
if (!objects.empty())
{
dest = borrowFromObjects();
dest = borrowFromObjects(lock);
return;
}
@ -49,12 +49,12 @@ public:
if (unlikely(has_unlimited_size) || allocated_objects_size < max_size)
{
dest = allocateObjectForBorrowing(std::forward<FactoryFunc>(func));
dest = allocateObjectForBorrowing(lock, std::forward<FactoryFunc>(func));
return;
}
condition_variable.wait(lock, [this] { return !objects.empty(); });
dest = borrowFromObjects();
dest = borrowFromObjects(lock);
}
/// Same as borrowObject function, but wait with timeout.
@ -66,7 +66,7 @@ public:
if (!objects.empty())
{
dest = borrowFromObjects();
dest = borrowFromObjects(lock);
return true;
}
@ -74,20 +74,20 @@ public:
if (unlikely(has_unlimited_size) || allocated_objects_size < max_size)
{
dest = allocateObjectForBorrowing(std::forward<FactoryFunc>(func));
dest = allocateObjectForBorrowing(lock, std::forward<FactoryFunc>(func));
return true;
}
bool wait_result = condition_variable.wait_for(lock, std::chrono::milliseconds(timeout_in_milliseconds), [this] { return !objects.empty(); });
if (wait_result)
dest = borrowFromObjects();
dest = borrowFromObjects(lock);
return wait_result;
}
/// Return object into pool. Client must return same object that was borrowed.
ALWAYS_INLINE inline void returnObject(T && object_to_return)
inline void returnObject(T && object_to_return)
{
std::unique_lock<std::mutex> lck(objects_mutex);
@ -98,20 +98,20 @@ public:
}
/// Max pool size
ALWAYS_INLINE inline size_t maxSize() const
inline size_t maxSize() const
{
return max_size;
}
/// Allocated objects size by the pool. If allocatedObjectsSize == maxSize then pool is full.
ALWAYS_INLINE inline size_t allocatedObjectsSize() const
inline size_t allocatedObjectsSize() const
{
std::unique_lock<std::mutex> lock(objects_mutex);
return allocated_objects_size;
}
/// Returns allocatedObjectsSize == maxSize
ALWAYS_INLINE inline bool isFull() const
inline bool isFull() const
{
std::unique_lock<std::mutex> lock(objects_mutex);
return allocated_objects_size == max_size;
@ -119,7 +119,7 @@ public:
/// Borrowed objects size. If borrowedObjectsSize == allocatedObjectsSize and pool is full.
/// Then client will wait during borrowObject function call.
ALWAYS_INLINE inline size_t borrowedObjectsSize() const
inline size_t borrowedObjectsSize() const
{
std::unique_lock<std::mutex> lock(objects_mutex);
return borrowed_objects_size;
@ -128,7 +128,7 @@ public:
private:
template <typename FactoryFunc>
ALWAYS_INLINE inline T allocateObjectForBorrowing(FactoryFunc && func)
inline T allocateObjectForBorrowing(const std::unique_lock<std::mutex> &, FactoryFunc && func)
{
++allocated_objects_size;
++borrowed_objects_size;
@ -136,7 +136,7 @@ private:
return std::forward<FactoryFunc>(func)();
}
ALWAYS_INLINE inline T borrowFromObjects()
inline T borrowFromObjects(const std::unique_lock<std::mutex> &)
{
T dst;
detail::moveOrCopyIfThrow(std::move(objects.back()), dst);
@ -147,11 +147,6 @@ private:
return dst;
}
ALWAYS_INLINE inline bool hasUnlimitedSize() const
{
return max_size == 0;
}
size_t max_size;
mutable std::mutex objects_mutex;

View File

@ -94,47 +94,40 @@ bool ShellCommand::tryWaitProcessWithTimeout(size_t timeout_in_seconds)
wait_called = true;
struct timespec interval {.tv_sec = 1, .tv_nsec = 0};
try
in.close();
out.close();
err.close();
if (timeout_in_seconds == 0)
{
in.close();
out.close();
err.close();
/// If there is no timeout before signal try to waitpid 1 time without block so we can avoid sending
/// signal if process is already terminated normally finished.
if (timeout_in_seconds == 0)
{
/// If there is no timeout before signal try to waitpid 1 time without block so we can avoid sending
/// signal if process is already terminated normally finished.
int waitpid_res = waitpid(pid, &status, WNOHANG);
bool process_terminated_normally = (waitpid_res == pid);
return process_terminated_normally;
}
/// If timeout is positive try waitpid without block in loop until
/// process is normally terminated or waitpid return error
while (timeout_in_seconds != 0)
{
int waitpid_res = waitpid(pid, &status, WNOHANG);
bool process_terminated_normally = (waitpid_res == pid);
if (process_terminated_normally)
return true;
else if (waitpid_res == 0)
{
--timeout_in_seconds;
nanosleep(&interval, nullptr);
continue;
}
else if (waitpid_res == -1 && errno != EINTR)
return false;
}
int waitpid_res = waitpid(pid, &status, WNOHANG);
bool process_terminated_normally = (waitpid_res == pid);
return process_terminated_normally;
}
catch (...)
/// If timeout is positive try waitpid without block in loop until
/// process is normally terminated or waitpid return error
while (timeout_in_seconds != 0)
{
return false;
int waitpid_res = waitpid(pid, &status, WNOHANG);
bool process_terminated_normally = (waitpid_res == pid);
if (process_terminated_normally)
return true;
else if (waitpid_res == 0)
{
--timeout_in_seconds;
nanosleep(&interval, nullptr);
continue;
}
else if (waitpid_res == -1 && errno != EINTR)
return false;
}
return false;

View File

@ -269,13 +269,13 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
bool check_config) -> DictionarySourcePtr
{
if (dict_struct.has_expressions)
throw Exception{"Dictionary source of type `executable` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};
throw Exception(ErrorCodes::LOGICAL_ERROR, "Dictionary source of type `executable` does not support attribute expressions");
/// Executable dictionaries may execute arbitrary commands.
/// It's OK for dictionaries created by administrator from xml-file, but
/// maybe dangerous for dictionaries created from DDL-queries.
if (check_config)
throw Exception("Dictionaries with executable dictionary source from DDL are not allowed", ErrorCodes::DICTIONARY_ACCESS_DENIED);
throw Exception(ErrorCodes::DICTIONARY_ACCESS_DENIED, "Dictionaries with executable dictionary source are not allowed to be created from DDL query");
Context context_local_copy = copyContextAndApplySettings(config_prefix, context, config);

View File

@ -25,32 +25,28 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int DICTIONARY_ACCESS_DENIED;
extern const int UNSUPPORTED_METHOD;
extern const int TIMEOUT_EXCEEDED;
extern const int CANNOT_READ_ALL_DATA;
}
ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const Configuration & configuration_,
Block & sample_block_,
const Context & context_)
: log(&Poco::Logger::get("ExecutablePoolDictionarySource"))
, dict_struct{dict_struct_}
, implicit_key{config.getBool(config_prefix + ".implicit_key", false)}
, command{config.getString(config_prefix + ".command")}
, update_field{config.getString(config_prefix + ".update_field", "")}
, format{config.getString(config_prefix + ".format")}
, pool_size(config.getUInt64(config_prefix + ".size"))
, command_termination_timeout(config.getUInt64(config_prefix + ".command_termination_timeout", 10))
, configuration{configuration_}
, sample_block{sample_block_}
, context(context_)
, context{context_}
/// If pool size == 0 then there is no size restrictions. Poco max size of semaphore is integer type.
, process_pool(std::make_shared<ProcessPool>(pool_size == 0 ? std::numeric_limits<int>::max() : pool_size))
, process_pool{std::make_shared<ProcessPool>(configuration.pool_size == 0 ? std::numeric_limits<int>::max() : configuration.pool_size)}
{
/// Remove keys from sample_block for implicit_key dictionary because
/// these columns will not be returned from source
/// Implicit key means that the source script will return only values,
/// and the correspondence to the requested keys is determined implicitly - by the order of rows in the result.
if (implicit_key)
if (configuration.implicit_key)
{
auto keys_names = dict_struct.getKeysNames();
@ -66,15 +62,10 @@ ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(const ExecutableP
: log(&Poco::Logger::get("ExecutablePoolDictionarySource"))
, update_time{other.update_time}
, dict_struct{other.dict_struct}
, implicit_key{other.implicit_key}
, command{other.command}
, update_field{other.update_field}
, format{other.format}
, pool_size{other.pool_size}
, command_termination_timeout{other.command_termination_timeout}
, configuration{other.configuration}
, sample_block{other.sample_block}
, context(other.context)
, process_pool(std::make_shared<ProcessPool>(pool_size))
, context{other.context}
, process_pool{std::make_shared<ProcessPool>(configuration.pool_size)}
{
}
@ -115,9 +106,9 @@ namespace
{
send_data(command->in);
}
catch (const std::exception & ex)
catch (...)
{
LOG_ERROR(log, "Error during write into process input stream: ({})", ex.what());
tryLogCurrentException(log);
error_during_write = true;
}
})
@ -142,8 +133,9 @@ namespace
{
if (error_during_write)
{
command = nullptr;
return Block();
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Error during write in command process");
}
if (current_read_rows == rows_to_read)
@ -156,10 +148,11 @@ namespace
block = stream->read();
current_read_rows += block.rows();
}
catch (const std::exception & ex)
catch (...)
{
LOG_ERROR(log, "Error during read from process output stream: ({})", ex.what());
tryLogCurrentException(log);
command = nullptr;
throw;
}
return block;
@ -168,20 +161,25 @@ namespace
void readPrefix() override
{
if (error_during_write)
return;
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Error during write in command process");
}
stream->readPrefix();
}
void readSuffix() override
{
if (error_during_write)
command = nullptr;
else
stream->readSuffix();
if (thread.joinable())
thread.join();
if (error_during_write)
{
command = nullptr;
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Error during write in command process");
}
else
stream->readSuffix();
}
String getName() const override { return "PoolWithBackgroundThread"; }
@ -218,26 +216,29 @@ BlockInputStreamPtr ExecutablePoolDictionarySource::loadKeys(const Columns & key
BlockInputStreamPtr ExecutablePoolDictionarySource::getStreamForBlock(const Block & block)
{
std::unique_ptr<ShellCommand> process;
process_pool->borrowObject(process, [this]()
bool result = process_pool->tryBorrowObject(process, [this]()
{
bool terminate_in_destructor = true;
ShellCommandDestructorStrategy strategy { terminate_in_destructor, command_termination_timeout };
auto shell_command = ShellCommand::execute(command, false, strategy);
ShellCommandDestructorStrategy strategy { terminate_in_destructor, configuration.command_termination_timeout };
auto shell_command = ShellCommand::execute(configuration.command, false, strategy);
return shell_command;
});
}, configuration.max_command_execution_time * 10000);
if (!result)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Could not get process from pool, max command execution timeout exceeded");
size_t rows_to_read = block.rows();
auto read_stream = context.getInputFormat(format, process->out, sample_block, rows_to_read);
auto read_stream = context.getInputFormat(configuration.format, process->out, sample_block, rows_to_read);
auto stream = std::make_unique<PoolBlockInputStreamWithBackgroundThread>(
process_pool, std::move(process), std::move(read_stream), rows_to_read, log,
[block, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context.getOutputStream(format, out, block.cloneEmpty());
auto output_stream = context.getOutputStream(configuration.format, out, block.cloneEmpty());
formatBlock(output_stream, block);
});
if (implicit_key)
if (configuration.implicit_key)
return std::make_shared<BlockInputStreamWithAdditionalColumns>(block, std::move(stream));
else
return std::shared_ptr<PoolBlockInputStreamWithBackgroundThread>(stream.release());
@ -255,7 +256,7 @@ bool ExecutablePoolDictionarySource::supportsSelectiveLoad() const
bool ExecutablePoolDictionarySource::hasUpdateField() const
{
return !update_field.empty();
return !configuration.update_field.empty();
}
DictionarySourcePtr ExecutablePoolDictionarySource::clone() const
@ -265,7 +266,7 @@ DictionarySourcePtr ExecutablePoolDictionarySource::clone() const
std::string ExecutablePoolDictionarySource::toString() const
{
return "ExecutablePool size: " + std::to_string(pool_size) + " command: " + command;
return "ExecutablePool size: " + std::to_string(configuration.pool_size) + " command: " + configuration.command;
}
void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
@ -279,24 +280,43 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
bool check_config) -> DictionarySourcePtr
{
if (dict_struct.has_expressions)
throw Exception{"Dictionary source of type `executable_pool` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};
throw Exception(ErrorCodes::LOGICAL_ERROR, "Dictionary source of type `executable_pool` does not support attribute expressions");
/// Executable dictionaries may execute arbitrary commands.
/// It's OK for dictionaries created by administrator from xml-file, but
/// maybe dangerous for dictionaries created from DDL-queries.
if (check_config)
throw Exception("Dictionaries with executable pool dictionary source are not allowed to be created from DDL query", ErrorCodes::DICTIONARY_ACCESS_DENIED);
throw Exception(ErrorCodes::DICTIONARY_ACCESS_DENIED, "Dictionaries with executable pool dictionary source are not allowed to be created from DDL query");
Context context_local_copy = copyContextAndApplySettings(config_prefix, context, config);
/** Currently parallel parsing input format cannot read exactly max_block_size rows from input,
* so it will be blocked on ReadBufferFromFileDescriptor because this file descriptor represent pipe that does not have eof.
*/
auto settings_no_parallel_parsing = context_local_copy.getSettings();
settings_no_parallel_parsing.input_format_parallel_parsing = false;
context_local_copy.setSettings(settings_no_parallel_parsing);
return std::make_unique<ExecutablePoolDictionarySource>(
dict_struct, config, config_prefix + ".executable_pool",
sample_block, context_local_copy);
String configuration_config_prefix = config_prefix + ".executable_pool";
size_t max_command_execution_time = config.getUInt64(configuration_config_prefix + ".max_command_execution_time", 10);
size_t max_execution_time_seconds = static_cast<size_t>(context.getSettings().max_execution_time.totalSeconds());
if (max_command_execution_time > max_execution_time_seconds)
max_command_execution_time = max_execution_time_seconds;
ExecutablePoolDictionarySource::Configuration configuration
{
.command = config.getString(configuration_config_prefix + ".command"),
.format = config.getString(configuration_config_prefix + ".format"),
.pool_size = config.getUInt64(configuration_config_prefix + ".size"),
.update_field = config.getString(configuration_config_prefix + ".update_field", ""),
.implicit_key = config.getBool(configuration_config_prefix + ".implicit_key", false),
.command_termination_timeout = config.getUInt64(configuration_config_prefix + ".command_termination_timeout", 10),
.max_command_execution_time = max_command_execution_time
};
return std::make_unique<ExecutablePoolDictionarySource>(dict_struct, configuration, sample_block, context_local_copy);
};
factory.registerSource("executable_pool", create_table_source);

View File

@ -26,10 +26,20 @@ using ProcessPool = BorrowedObjectPool<std::unique_ptr<ShellCommand>>;
class ExecutablePoolDictionarySource final : public IDictionarySource
{
public:
struct Configuration
{
const String command;
const String format;
const size_t pool_size;
const String update_field;
const bool implicit_key;
const size_t command_termination_timeout;
const size_t max_command_execution_time;
};
ExecutablePoolDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const Configuration & configuration_,
Block & sample_block_,
const Context & context_);
@ -64,12 +74,7 @@ private:
Poco::Logger * log;
time_t update_time = 0;
const DictionaryStructure dict_struct;
bool implicit_key;
const std::string command;
const std::string update_field;
const std::string format;
const size_t pool_size;
const size_t command_termination_timeout;
const Configuration configuration;
Block sample_block;
Context context;