Push exception into pipe

This commit is contained in:
Alexey Milovidov 2021-07-19 19:46:58 +03:00
parent e1ddde71c0
commit c85b7ad556
2 changed files with 27 additions and 29 deletions

View File

@ -182,6 +182,32 @@ BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & en
return io;
}
static Block getSampleBlock(ContextPtr context_, bool hosts_to_wait)
{
auto output_mode = context_->getSettingsRef().distributed_ddl_output_mode;
auto maybe_make_nullable = [&](const DataTypePtr & type) -> DataTypePtr
{
if (output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE)
return type;
return std::make_shared<DataTypeNullable>(type);
};
Block res = Block{
{std::make_shared<DataTypeString>(), "host"},
{std::make_shared<DataTypeUInt16>(), "port"},
{maybe_make_nullable(std::make_shared<DataTypeInt64>()), "status"},
{maybe_make_nullable(std::make_shared<DataTypeString>()), "error"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
if (hosts_to_wait)
res.erase("port");
return res;
}
DDLQueryStatusSource::DDLQueryStatusSource(
const String & zk_node_path, const DDLLogEntry & entry, ContextPtr context_, const std::optional<Strings> & hosts_to_wait)
: SourceWithProgress(getSampleBlock(context_, hosts_to_wait.has_value()), true)
@ -208,32 +234,6 @@ DDLQueryStatusSource::DDLQueryStatusSource(
timeout_seconds = context->getSettingsRef().distributed_ddl_task_timeout;
}
Block DDLQueryStatusSource::getSampleBlock(ContextPtr context_, bool hosts_to_wait)
{
auto output_mode = context_->getSettingsRef().distributed_ddl_output_mode;
auto maybe_make_nullable = [&](const DataTypePtr & type) -> DataTypePtr
{
if (output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE)
return type;
return std::make_shared<DataTypeNullable>(type);
};
Block res = Block{
{std::make_shared<DataTypeString>(), "host"},
{std::make_shared<DataTypeUInt16>(), "port"},
{maybe_make_nullable(std::make_shared<DataTypeInt64>()), "status"},
{maybe_make_nullable(std::make_shared<DataTypeString>()), "error"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
if (hosts_to_wait)
res.erase("port");
return res;
}
std::pair<String, UInt16> DDLQueryStatusSource::parseHostAndPort(const String & host_id) const
{
String host = host_id;
@ -360,7 +360,7 @@ void DDLQueryStatusSource::work()
bool throw_if_error_on_host = context->getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW;
if (first_exception && throw_if_error_on_host)
throw Exception(*first_exception);
output.pushException(std::make_exception_ptr(*first_exception));
}
}

View File

@ -47,8 +47,6 @@ private:
std::pair<String, UInt16> parseHostAndPort(const String & host_id) const;
Block getSampleBlock(ContextPtr context, bool hosts_to_wait);
String node_path;
ContextPtr context;
Stopwatch watch;