This commit is contained in:
Alexander Tokmakov 2021-07-01 19:43:59 +03:00
parent d9a77e3a1a
commit 1b2416007e
2 changed files with 30 additions and 22 deletions

View File

@ -77,6 +77,7 @@ namespace ErrorCodes
{
extern const int INTO_OUTFILE_NOT_ALLOWED;
extern const int QUERY_WAS_CANCELLED;
extern const int LOGICAL_ERROR;
}
@ -1060,30 +1061,26 @@ void executeQuery(
}
else if (pipeline.initialized())
{
if (pipeline.isCompleted())
const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
WriteBuffer * out_buf = &ostr;
std::optional<WriteBufferFromFile> out_file_buf;
if (ast_query_with_output && ast_query_with_output->out_file)
{
pipeline.setProgressCallback(context->getProgressCallback());
if (!allow_into_outfile)
throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);
const auto & out_file = typeid_cast<const ASTLiteral &>(*ast_query_with_output->out_file).value.safeGet<std::string>();
out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
out_buf = &*out_file_buf;
}
else
String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();
if (!pipeline.isCompleted())
{
const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
WriteBuffer * out_buf = &ostr;
std::optional<WriteBufferFromFile> out_file_buf;
if (ast_query_with_output && ast_query_with_output->out_file)
{
if (!allow_into_outfile)
throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);
const auto & out_file = typeid_cast<const ASTLiteral &>(*ast_query_with_output->out_file).value.safeGet<std::string>();
out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
out_buf = &*out_file_buf;
}
String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();
pipeline.addSimpleTransform([](const Block & header)
{
return std::make_shared<MaterializingTransform>(header);
@ -1109,6 +1106,10 @@ void executeQuery(
pipeline.setOutputFormat(std::move(out));
}
else
{
pipeline.setProgressCallback(context->getProgressCallback());
}
{
auto executor = pipeline.execute();

View File

@ -1139,7 +1139,14 @@ ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
void StorageDistributed::flush()
{
flushClusterNodesAllData(getContext());
try
{
flushClusterNodesAllData(getContext());
}
catch (...)
{
tryLogCurrentException(log, "Cannot flush");
}
}
void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)