Refactor many exception messages

1. Always use fmt variant
2. Remove redundant period at the end of message
3. Remove useless parenthesis
This commit is contained in:
Amos Bird 2022-04-24 18:34:50 +08:00
parent e73d7fea31
commit a25bb50096
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
28 changed files with 328 additions and 248 deletions

View File

@ -11,6 +11,13 @@ class NetException : public Exception
public:
NetException(const std::string & msg, int code) : Exception(msg, code) {}
// Format message with fmt::format, like the logging functions.
template <typename... Args>
NetException(int code, fmt::format_string<Args...> fmt, Args &&... args)
: Exception(fmt::format(fmt, std::forward<Args>(args)...), code)
{
}
NetException * clone() const override { return new NetException(*this); }
void rethrow() const override { throw *this; }

View File

@ -34,12 +34,12 @@ ExecutingGraph::Edge & ExecutingGraph::addEdge(Edges & edges, Edge edge, const I
{
auto it = processors_map.find(to);
if (it == processors_map.end())
{
String msg = "Processor " + to->getName() + " was found as " + (edge.backward ? "input" : "output")
+ " for processor " + from->getName() + ", but not found in list of processors.";
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
}
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Processor {} was found as {} for processor {}, but not found in list of processors",
to->getName(),
edge.backward ? "input" : "output",
from->getName());
edge.to = it->second;
auto & added_edge = edges.emplace_back(std::move(edge));
@ -128,8 +128,7 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
{
auto * processor = processors[nodes.size()].get();
if (processors_map.contains(processor))
throw Exception("Processor " + processor->getName() + " was already added to pipeline.",
ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Processor {} was already added to pipeline", processor->getName());
processors_map[processor] = nodes.size();
nodes.emplace_back(std::make_unique<Node>(processor, nodes.size()));

View File

@ -185,10 +185,10 @@ void PipelineExecutor::executeSingleThread(size_t thread_num)
auto & context = tasks.getThreadContext(thread_num);
LOG_TRACE(log,
"Thread finished. Total time: {} sec. Execution time: {} sec. Processing time: {} sec. Wait time: {} sec.",
(context.total_time_ns / 1e9),
(context.execution_time_ns / 1e9),
(context.processing_time_ns / 1e9),
(context.wait_time_ns / 1e9));
context.total_time_ns / 1e9,
context.execution_time_ns / 1e9,
context.processing_time_ns / 1e9,
context.wait_time_ns / 1e9);
#endif
}

View File

@ -77,7 +77,7 @@ size_t StreamingFormatExecutor::execute()
case IProcessor::Status::NeedData:
case IProcessor::Status::Async:
case IProcessor::Status::ExpandPipeline:
throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Source processor returned status {}", IProcessor::statusToName(status));
}
}
}

View File

@ -25,7 +25,7 @@ public:
size_t getAnyThreadWithTasks(size_t from_thread = 0)
{
if (num_tasks == 0)
throw Exception("TaskQueue is empty.", ErrorCodes::LOGICAL_ERROR);
throw Exception("TaskQueue is empty", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < queues.size(); ++i)
{
@ -37,7 +37,7 @@ public:
from_thread = 0;
}
throw Exception("TaskQueue is empty.", ErrorCodes::LOGICAL_ERROR);
throw Exception("TaskQueue is empty", ErrorCodes::LOGICAL_ERROR);
}
Task * pop(size_t thread_num)

View File

@ -36,7 +36,7 @@ struct ThreadsQueue
void push(size_t thread)
{
if (unlikely(has(thread)))
throw Exception("Can't push thread because it is already in threads queue.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Can't push thread because it is already in threads queue", ErrorCodes::LOGICAL_ERROR);
swapThreads(thread, stack[stack_size]);
++stack_size;
@ -45,7 +45,7 @@ struct ThreadsQueue
void pop(size_t thread)
{
if (unlikely(!has(thread)))
throw Exception("Can't pop thread because it is not in threads queue.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Can't pop thread because it is not in threads queue", ErrorCodes::LOGICAL_ERROR);
--stack_size;
swapThreads(thread, stack[stack_size]);
@ -54,7 +54,7 @@ struct ThreadsQueue
size_t popAny()
{
if (unlikely(stack_size == 0))
throw Exception("Can't pop from empty queue.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Can't pop from empty queue", ErrorCodes::LOGICAL_ERROR);
--stack_size;
return stack[stack_size];

View File

@ -67,8 +67,7 @@ void IInflatingTransform::work()
if (can_generate)
{
if (generated)
throw Exception("IInflatingTransform cannot consume chunk because it already was generated",
ErrorCodes::LOGICAL_ERROR);
throw Exception("IInflatingTransform cannot consume chunk because it already was generated", ErrorCodes::LOGICAL_ERROR);
current_chunk = generate();
generated = true;
@ -77,8 +76,7 @@ void IInflatingTransform::work()
else
{
if (!has_input)
throw Exception("IInflatingTransform cannot consume chunk because it wasn't read",
ErrorCodes::LOGICAL_ERROR);
throw Exception("IInflatingTransform cannot consume chunk because it wasn't read", ErrorCodes::LOGICAL_ERROR);
consume(std::move(current_chunk));
has_input = false;

View File

@ -178,7 +178,7 @@ public:
*/
virtual Status prepare()
{
throw Exception("Method 'prepare' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'prepare' is not implemented for {} processor", getName());
}
using PortNumbers = std::vector<UInt64>;
@ -193,7 +193,7 @@ public:
*/
virtual void work()
{
throw Exception("Method 'work' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'work' is not implemented for {} processor", getName());
}
/** Executor must call this method when 'prepare' returned Async.
@ -212,7 +212,7 @@ public:
*/
virtual int schedule()
{
throw Exception("Method 'schedule' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'schedule' is not implemented for {} processor", getName());
}
/** You must call this method if 'prepare' returned ExpandPipeline.
@ -226,7 +226,7 @@ public:
*/
virtual Processors expandPipeline()
{
throw Exception("Method 'expandPipeline' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'expandPipeline' is not implemented for {} processor", getName());
}
/// In case if query was cancelled executor will wait till all processors finish their jobs.
@ -258,7 +258,7 @@ public:
++number;
}
throw Exception("Can't find input port for " + getName() + " processor", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find input port for {} processor", getName());
}
UInt64 getOutputPortNumber(const OutputPort * output_port) const
@ -272,7 +272,7 @@ public:
++number;
}
throw Exception("Can't find output port for " + getName() + " processor", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find output port for {} processor", getName());
}
const auto & getInputs() const { return inputs; }

View File

@ -46,7 +46,7 @@ public:
virtual void transform(Chunk &)
{
throw Exception("Method transform is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method transform is not implemented for {}", getName());
}
Status prepare() override;

View File

@ -19,7 +19,7 @@ LimitTransform::LimitTransform(
, with_ties(with_ties_), description(std::move(description_))
{
if (num_streams != 1 && with_ties)
throw Exception("Cannot use LimitTransform with multiple ports and ties.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot use LimitTransform with multiple ports and ties", ErrorCodes::LOGICAL_ERROR);
ports_data.resize(num_streams);
@ -86,8 +86,7 @@ IProcessor::Status LimitTransform::prepare(
return;
default:
throw Exception(
"Unexpected status for LimitTransform::preparePair : " + IProcessor::statusToName(status),
ErrorCodes::LOGICAL_ERROR);
ErrorCodes::LOGICAL_ERROR, "Unexpected status for LimitTransform::preparePair : {}", IProcessor::statusToName(status));
}
};
@ -126,7 +125,7 @@ IProcessor::Status LimitTransform::prepare(
LimitTransform::Status LimitTransform::prepare()
{
if (ports_data.size() != 1)
throw Exception("prepare without arguments is not supported for multi-port LimitTransform.",
throw Exception("prepare without arguments is not supported for multi-port LimitTransform",
ErrorCodes::LOGICAL_ERROR);
return prepare({0}, {0});

View File

@ -153,7 +153,7 @@ struct RowRef
return true;
}
bool hasEqualSortColumnsWith(const RowRef & other)
bool hasEqualSortColumnsWith(const RowRef & other) const
{
return checkEquals(num_columns, sort_columns, row_num, other.sort_columns, other.row_num);
}
@ -197,7 +197,7 @@ struct RowRefWithOwnedChunk
sort_columns = &owned_chunk->sort_columns;
}
bool hasEqualSortColumnsWith(const RowRefWithOwnedChunk & other)
bool hasEqualSortColumnsWith(const RowRefWithOwnedChunk & other) const
{
return RowRef::checkEquals(sort_columns->size(), sort_columns->data(), row_num,
other.sort_columns->data(), other.row_num);

View File

@ -32,9 +32,7 @@ OffsetTransform::OffsetTransform(
}
IProcessor::Status OffsetTransform::prepare(
const PortNumbers & updated_input_ports,
const PortNumbers & updated_output_ports)
IProcessor::Status OffsetTransform::prepare(const PortNumbers & updated_input_ports, const PortNumbers & updated_output_ports)
{
bool has_full_port = false;
@ -63,9 +61,7 @@ IProcessor::Status OffsetTransform::prepare(
return;
default:
throw Exception(
"Unexpected status for OffsetTransform::preparePair : " + IProcessor::statusToName(status),
ErrorCodes::LOGICAL_ERROR);
ErrorCodes::LOGICAL_ERROR, "Unexpected status for OffsetTransform::preparePair : {}", IProcessor::statusToName(status));
}
};
@ -88,7 +84,7 @@ IProcessor::Status OffsetTransform::prepare(
OffsetTransform::Status OffsetTransform::prepare()
{
if (ports_data.size() != 1)
throw Exception("prepare without arguments is not supported for multi-port OffsetTransform.",
throw Exception("prepare without arguments is not supported for multi-port OffsetTransform",
ErrorCodes::LOGICAL_ERROR);
return prepare({0}, {0});

View File

@ -16,7 +16,7 @@ void connect(OutputPort & output, InputPort & input)
auto out_name = output.getProcessor().getName();
auto in_name = input.getProcessor().getName();
assertCompatibleHeader(output.getHeader(), input.getHeader(), " function connect between " + out_name + " and " + in_name);
assertCompatibleHeader(output.getHeader(), input.getHeader(), fmt::format(" function connect between {} and {}", out_name, in_name));
input.output_port = &output;
output.input_port = &input;

View File

@ -89,7 +89,7 @@ protected:
DataPtr() : data(new Data())
{
if (unlikely((getUInt(data) & FLAGS_MASK) != 0))
throw Exception("Not alignment memory for Port.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Not alignment memory for Port", ErrorCodes::LOGICAL_ERROR);
}
/// Pointer can store flags in case of exception in swap.
~DataPtr() { delete getPtr(getUInt(data) & PTR_MASK); }
@ -133,7 +133,7 @@ protected:
State() : data(new Data())
{
if (unlikely((getUInt(data) & FLAGS_MASK) != 0))
throw Exception("Not alignment memory for Port.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Not alignment memory for Port", ErrorCodes::LOGICAL_ERROR);
}
~State()
@ -160,7 +160,7 @@ protected:
/// throw Exception("Cannot push block to port which is not needed.", ErrorCodes::LOGICAL_ERROR);
if (unlikely(flags & HAS_DATA))
throw Exception("Cannot push block to port which already has data.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot push block to port which already has data", ErrorCodes::LOGICAL_ERROR);
}
void ALWAYS_INLINE pull(DataPtr & data_, std::uintptr_t & flags, bool set_not_needed = false)
@ -174,10 +174,10 @@ protected:
/// It's ok to check because this flag can be changed only by pulling thread.
if (unlikely((flags & IS_NEEDED) == 0) && !set_not_needed)
throw Exception("Cannot pull block from port which is not needed.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot pull block from port which is not needed", ErrorCodes::LOGICAL_ERROR);
if (unlikely((flags & HAS_DATA) == 0))
throw Exception("Cannot pull block from port which has no data.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot pull block from port which has no data", ErrorCodes::LOGICAL_ERROR);
}
std::uintptr_t ALWAYS_INLINE setFlags(std::uintptr_t flags, std::uintptr_t mask)
@ -289,13 +289,15 @@ public:
{
auto & chunk = data->chunk;
String msg = "Invalid number of columns in chunk pulled from OutputPort. Expected "
+ std::to_string(header.columns()) + ", found " + std::to_string(chunk.getNumColumns()) + '\n';
msg += "Header: " + header.dumpStructure() + '\n';
msg += "Chunk: " + chunk.dumpStructure() + '\n';
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Invalid number of columns in chunk pulled from OutputPort. Expected {}, found {}\n"
"Header: {}\n"
"Chunk: {}\n",
header.columns(),
chunk.getNumColumns(),
header.dumpStructure(),
chunk.dumpStructure());
}
return std::move(*data);
@ -403,14 +405,15 @@ public:
{
if (unlikely(!data_.exception && data_.chunk.getNumColumns() != header.columns()))
{
String msg = "Invalid number of columns in chunk pushed to OutputPort. Expected "
+ std::to_string(header.columns())
+ ", found " + std::to_string(data_.chunk.getNumColumns()) + '\n';
msg += "Header: " + header.dumpStructure() + '\n';
msg += "Chunk: " + data_.chunk.dumpStructure() + '\n';
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Invalid number of columns in chunk pushed to OutputPort. Expected {}, found {}\n"
"Header: {}\n"
"Chunk: {}\n",
header.columns(),
data_.chunk.getNumColumns(),
header.dumpStructure(),
data_.chunk.dumpStructure());
}
updateVersion();

View File

@ -64,22 +64,25 @@ void QueryPlan::unitePlans(QueryPlanStepPtr step, std::vector<std::unique_ptr<Qu
const auto & inputs = step->getInputStreams();
size_t num_inputs = step->getInputStreams().size();
if (num_inputs != plans.size())
{
throw Exception("Cannot unite QueryPlans using " + step->getName() +
" because step has different number of inputs. "
"Has " + std::to_string(plans.size()) + " plans "
"and " + std::to_string(num_inputs) + " inputs", ErrorCodes::LOGICAL_ERROR);
}
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot unite QueryPlans using {} because step has different number of inputs. Has {} plans and {} inputs",
step->getName(),
plans.size(),
num_inputs);
for (size_t i = 0; i < num_inputs; ++i)
{
const auto & step_header = inputs[i].header;
const auto & plan_header = plans[i]->getCurrentDataStream().header;
if (!blocksHaveEqualStructure(step_header, plan_header))
throw Exception("Cannot unite QueryPlans using " + step->getName() + " because "
"it has incompatible header with plan " + root->step->getName() + " "
"plan header: " + plan_header.dumpStructure() +
"step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot unite QueryPlans using {} because it has incompatible header with plan {} plan header: {} step header: {}",
step->getName(),
root->step->getName(),
plan_header.dumpStructure(),
step_header.dumpStructure());
}
for (auto & plan : plans)
@ -108,8 +111,10 @@ void QueryPlan::addStep(QueryPlanStepPtr step)
if (num_input_streams == 0)
{
if (isInitialized())
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
"step has no inputs, but QueryPlan is already initialized", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add step {} to QueryPlan because step has no inputs, but QueryPlan is already initialized",
step->getName());
nodes.emplace_back(Node{.step = std::move(step)});
root = &nodes.back();
@ -119,25 +124,33 @@ void QueryPlan::addStep(QueryPlanStepPtr step)
if (num_input_streams == 1)
{
if (!isInitialized())
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
"step has input, but QueryPlan is not initialized", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add step {} to QueryPlan because step has input, but QueryPlan is not initialized",
step->getName());
const auto & root_header = root->step->getOutputStream().header;
const auto & step_header = step->getInputStreams().front().header;
if (!blocksHaveEqualStructure(root_header, step_header))
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
"it has incompatible header with root step " + root->step->getName() + " "
"root header: " + root_header.dumpStructure() +
"step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add step {} to QueryPlan because it has incompatible header with root step {} root header: {} step header: {}",
step->getName(),
root->step->getName(),
root_header.dumpStructure(),
step_header.dumpStructure());
nodes.emplace_back(Node{.step = std::move(step), .children = {root}});
root = &nodes.back();
return;
}
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because it has " +
std::to_string(num_input_streams) + " inputs but " + std::to_string(isInitialized() ? 1 : 0) +
" input expected", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add step {} to QueryPlan because it has {} inputs but {} input expected",
step->getName(),
num_input_streams,
isInitialized() ? 1 : 0);
}
QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(

View File

@ -343,12 +343,12 @@ IProcessor::Status StrictResizeProcessor::prepare(const PortNumbers & updated_in
inputs_with_data.pop();
if (input_with_data.waiting_output == -1)
throw Exception("No associated output for input with data.", ErrorCodes::LOGICAL_ERROR);
throw Exception("No associated output for input with data", ErrorCodes::LOGICAL_ERROR);
auto & waiting_output = output_ports[input_with_data.waiting_output];
if (waiting_output.status == OutputStatus::NotActive)
throw Exception("Invalid status NotActive for associated output.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Invalid status NotActive for associated output", ErrorCodes::LOGICAL_ERROR);
if (waiting_output.status != OutputStatus::Finished)
{

View File

@ -337,8 +337,11 @@ void MySQLSource::initPositionMappingFromQueryResultStructure()
if (!settings->fetch_by_name)
{
if (description.sample_block.columns() != connection->result.getNumFields())
throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "
+ toString(description.sample_block.columns()) + " expected", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
throw Exception(
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH,
"mysqlxx::UseQueryResult contains {} columns while {} expected",
connection->result.getNumFields(),
description.sample_block.columns());
for (const auto idx : collections::range(0, connection->result.getNumFields()))
position_mapping[idx] = idx;
@ -362,18 +365,10 @@ void MySQLSource::initPositionMappingFromQueryResultStructure()
}
if (!missing_names.empty())
{
WriteBufferFromOwnString exception_message;
for (auto iter = missing_names.begin(); iter != missing_names.end(); ++iter)
{
if (iter != missing_names.begin())
exception_message << ", ";
exception_message << *iter;
}
throw Exception("mysqlxx::UseQueryResult must be contain the" + exception_message.str() + " columns.",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
}
throw Exception(
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH,
"mysqlxx::UseQueryResult must contain columns: {}",
fmt::join(missing_names, ", "));
}
}

View File

@ -18,10 +18,7 @@ class SQLiteSource : public SourceWithProgress
using SQLitePtr = std::shared_ptr<sqlite3>;
public:
SQLiteSource(SQLitePtr sqlite_db_,
const String & query_str_,
const Block & sample_block,
UInt64 max_block_size_);
SQLiteSource(SQLitePtr sqlite_db_, const String & query_str_, const Block & sample_block, UInt64 max_block_size_);
String getName() const override { return "SQLite"; }

View File

@ -125,7 +125,7 @@ public:
ssize_t res = ::read(fd, internal_buffer.begin(), internal_buffer.size());
if (-1 == res && errno != EINTR)
throwFromErrno("Cannot read from pipe ", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
throwFromErrno("Cannot read from pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
if (res == 0)
break;
@ -187,7 +187,7 @@ public:
ssize_t res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written);
if ((-1 == res || 0 == res) && errno != EINTR)
throwFromErrno("Cannot write into pipe ", ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR);
throwFromErrno("Cannot write into pipe", ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR);
if (res > 0)
bytes_written += res;

View File

@ -12,23 +12,27 @@ namespace ErrorCodes
static void checkSingleInput(const IProcessor & transform)
{
if (transform.getInputs().size() != 1)
throw Exception("Transform for chain should have single input, "
"but " + transform.getName() + " has " +
toString(transform.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transform for chain should have single input, but {} has {} inputs",
transform.getName(),
transform.getInputs().size());
if (transform.getInputs().front().isConnected())
throw Exception("Transform for chain has connected input.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Transform for chain has connected input", ErrorCodes::LOGICAL_ERROR);
}
static void checkSingleOutput(const IProcessor & transform)
{
if (transform.getOutputs().size() != 1)
throw Exception("Transform for chain should have single output, "
"but " + transform.getName() + " has " +
toString(transform.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transform for chain should have single output, but {} has {} outputs",
transform.getName(),
transform.getOutputs().size());
if (transform.getOutputs().front().isConnected())
throw Exception("Transform for chain has connected input.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Transform for chain has connected output", ErrorCodes::LOGICAL_ERROR);
}
static void checkTransform(const IProcessor & transform)
@ -40,7 +44,7 @@ static void checkTransform(const IProcessor & transform)
static void checkInitialized(const std::list<ProcessorPtr> & processors)
{
if (processors.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Drain is not initialized");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chain is not initialized");
}
Chain::Chain(ProcessorPtr processor)
@ -61,15 +65,17 @@ Chain::Chain(std::list<ProcessorPtr> processors_) : processors(std::move(process
{
for (const auto & input : processor->getInputs())
if (&input != &getInputPort() && !input.isConnected())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot initialize chain because there is a not connected input for {}",
processor->getName());
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot initialize chain because there is a disconnected input for {}",
processor->getName());
for (const auto & output : processor->getOutputs())
if (&output != &getOutputPort() && !output.isConnected())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot initialize chain because there is a not connected output for {}",
processor->getName());
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot initialize chain because there is a disconnected output for {}",
processor->getName());
}
}

View File

@ -61,15 +61,19 @@ void ExecutionSpeedLimits::throttle(
{
auto rows_per_second = read_rows / elapsed_seconds;
if (min_execution_rps && rows_per_second < min_execution_rps)
throw Exception("Query is executing too slow: " + toString(read_rows / elapsed_seconds)
+ " rows/sec., minimum: " + toString(min_execution_rps),
ErrorCodes::TOO_SLOW);
throw Exception(
ErrorCodes::TOO_SLOW,
"Query is executing too slow: {} rows/sec., minimum: {}",
read_rows / elapsed_seconds,
min_execution_rps);
auto bytes_per_second = read_bytes / elapsed_seconds;
if (min_execution_bps && bytes_per_second < min_execution_bps)
throw Exception("Query is executing too slow: " + toString(read_bytes / elapsed_seconds)
+ " bytes/sec., minimum: " + toString(min_execution_bps),
ErrorCodes::TOO_SLOW);
throw Exception(
ErrorCodes::TOO_SLOW,
"Query is executing too slow: {} bytes/sec., minimum: {}",
read_bytes / elapsed_seconds,
min_execution_bps);
/// If the predicted execution time is longer than `max_execution_time`.
if (max_execution_time != 0 && total_rows_to_read && read_rows)
@ -77,10 +81,12 @@ void ExecutionSpeedLimits::throttle(
double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows_to_read) / read_rows);
if (estimated_execution_time_seconds > max_execution_time.totalSeconds())
throw Exception("Estimated query execution time (" + toString(estimated_execution_time_seconds) + " seconds)"
+ " is too long. Maximum: " + toString(max_execution_time.totalSeconds())
+ ". Estimated rows to process: " + toString(total_rows_to_read),
ErrorCodes::TOO_SLOW);
throw Exception(
ErrorCodes::TOO_SLOW,
"Estimated query execution time ({} seconds) is too long. Maximum: {}. Estimated rows to process: {}",
estimated_execution_time_seconds,
max_execution_time.totalSeconds(),
total_rows_to_read);
}
if (max_execution_rps && rows_per_second >= max_execution_rps)
@ -92,12 +98,13 @@ void ExecutionSpeedLimits::throttle(
}
}
static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
template <typename... Args>
static bool handleOverflowMode(OverflowMode mode, int code, fmt::format_string<Args...> fmt, Args &&... args)
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
throw Exception(code, std::move(fmt), std::forward<Args>(args)...);
case OverflowMode::BREAK:
return false;
default:
@ -112,10 +119,12 @@ bool ExecutionSpeedLimits::checkTimeLimit(const Stopwatch & stopwatch, OverflowM
auto elapsed_ns = stopwatch.elapsed();
if (elapsed_ns > static_cast<UInt64>(max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(overflow_mode,
"Timeout exceeded: elapsed " + toString(static_cast<double>(elapsed_ns) / 1000000000ULL)
+ " seconds, maximum: " + toString(max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return handleOverflowMode(
overflow_mode,
ErrorCodes::TIMEOUT_EXCEEDED,
"Timeout exceeded: elapsed {} seconds, maximum: {}",
static_cast<double>(elapsed_ns) / 1000000000ULL,
max_execution_time.totalMicroseconds() / 1000000.0);
}
return true;

View File

@ -23,16 +23,18 @@ namespace ErrorCodes
static void checkSource(const IProcessor & source)
{
if (!source.getInputs().empty())
throw Exception("Source for pipe shouldn't have any input, but " + source.getName() + " has " +
toString(source.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Source for pipe shouldn't have any input, but {} has {} inputs",
source.getName(),
source.getInputs().size());
if (source.getOutputs().empty())
throw Exception("Source for pipe should have single output, but it doesn't have any",
ErrorCodes::LOGICAL_ERROR);
if (source.getOutputs().size() > 1)
throw Exception("Source for pipe should have single output, but " + source.getName() + " has " +
toString(source.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
if (source.getOutputs().size() != 1)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Source for pipe should have single output, but {} has {} outputs",
source.getName(),
source.getOutputs().size());
}
static OutputPort * uniteExtremes(const OutputPortRawPtrs & ports, const Block & header, Processors & processors)
@ -112,8 +114,11 @@ PipelineResourcesHolder Pipe::detachResources()
Pipe::Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, OutputPort * extremes)
{
if (!source->getInputs().empty())
throw Exception("Source for pipe shouldn't have any input, but " + source->getName() + " has " +
toString(source->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Source for pipe shouldn't have any input, but {} has {} inputs",
source->getName(),
source->getInputs().size());
if (!output)
throw Exception("Cannot create Pipe from source because specified output port is nullptr",
@ -141,8 +146,7 @@ Pipe::Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, Output
auto it = std::find_if(outputs.begin(), outputs.end(), [port](const OutputPort & p) { return &p == port; });
if (it == outputs.end())
throw Exception("Cannot create Pipe because specified " + name + " port does not belong to source",
ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot create Pipe because specified {} port does not belong to source", name);
};
check_port_from_source(output, "output");
@ -150,9 +154,11 @@ Pipe::Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, Output
check_port_from_source(extremes, "extremes");
if (num_specified_ports != outputs.size())
throw Exception("Cannot create Pipe from source because it has " + std::to_string(outputs.size()) +
" output ports, but " + std::to_string(num_specified_ports) + " were specified",
ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create Pipe from source because it has {} output ports, but {} were specified",
outputs.size(),
num_specified_ports);
}
totals_port = totals;
@ -188,14 +194,16 @@ Pipe::Pipe(Processors processors_) : processors(std::move(processors_))
for (const auto & port : processor->getInputs())
{
if (!port.isConnected())
throw Exception("Cannot create Pipe because processor " + processor->getName() +
" has not connected input port", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Cannot create Pipe because processor {} has disconnected input port", processor->getName());
const auto * connected_processor = &port.getOutputPort().getProcessor();
if (!set.contains(connected_processor))
throw Exception("Cannot create Pipe because processor " + processor->getName() +
" has input port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create Pipe because processor {} has input port which is connected with unknown processor {}",
processor->getName(),
connected_processor->getName());
}
for (auto & port : processor->getOutputs())
@ -208,14 +216,16 @@ Pipe::Pipe(Processors processors_) : processors(std::move(processors_))
const auto * connected_processor = &port.getInputPort().getProcessor();
if (!set.contains(connected_processor))
throw Exception("Cannot create Pipe because processor " + processor->getName() +
" has output port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create Pipe because processor {} has output port which is connected with unknown processor {}",
processor->getName(),
connected_processor->getName());
}
}
if (output_ports.empty())
throw Exception("Cannot create Pipe because processors don't have any not-connected output ports",
throw Exception("Cannot create Pipe because processors don't have any disconnected output ports",
ErrorCodes::LOGICAL_ERROR);
header = output_ports.front()->getHeader();
@ -365,10 +375,10 @@ void Pipe::addSource(ProcessorPtr source)
void Pipe::addTotalsSource(ProcessorPtr source)
{
if (output_ports.empty())
throw Exception("Cannot add totals source to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot add totals source to empty Pipe", ErrorCodes::LOGICAL_ERROR);
if (totals_port)
throw Exception("Totals source was already added to Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Totals source was already added to Pipe", ErrorCodes::LOGICAL_ERROR);
checkSource(*source);
const auto & source_header = output_ports.front()->getHeader();
@ -385,10 +395,10 @@ void Pipe::addTotalsSource(ProcessorPtr source)
void Pipe::addExtremesSource(ProcessorPtr source)
{
if (output_ports.empty())
throw Exception("Cannot add extremes source to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot add extremes source to empty Pipe", ErrorCodes::LOGICAL_ERROR);
if (extremes_port)
throw Exception("Extremes source was already added to Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Extremes source was already added to Pipe", ErrorCodes::LOGICAL_ERROR);
checkSource(*source);
const auto & source_header = output_ports.front()->getHeader();
@ -435,20 +445,23 @@ void Pipe::addTransform(ProcessorPtr transform)
void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes)
{
if (output_ports.empty())
throw Exception("Cannot add transform to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot add transform to empty Pipe", ErrorCodes::LOGICAL_ERROR);
auto & inputs = transform->getInputs();
if (inputs.size() != output_ports.size())
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"Processor has " + std::to_string(inputs.size()) + " input ports, "
"but " + std::to_string(output_ports.size()) + " expected", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipe because it has {} input ports, but {} expected",
transform->getName(),
inputs.size(),
output_ports.size());
if (totals && totals_port)
throw Exception("Cannot add transform with totals to Pipe because it already has totals.",
throw Exception("Cannot add transform with totals to Pipe because it already has totals",
ErrorCodes::LOGICAL_ERROR);
if (extremes && extremes_port)
throw Exception("Cannot add transform with extremes to Pipe because it already has extremes.",
throw Exception("Cannot add transform with extremes to Pipe because it already has extremes",
ErrorCodes::LOGICAL_ERROR);
if (totals)
@ -515,21 +528,24 @@ void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort
void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes)
{
if (output_ports.empty())
throw Exception("Cannot add transform to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot add transform to empty Pipe", ErrorCodes::LOGICAL_ERROR);
auto & inputs = transform->getInputs();
size_t expected_inputs = output_ports.size() + (totals ? 1 : 0) + (extremes ? 1 : 0);
if (inputs.size() != expected_inputs)
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"Processor has " + std::to_string(inputs.size()) + " input ports, "
"but " + std::to_string(expected_inputs) + " expected", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipe because it has {} input ports, but {} expected",
transform->getName(),
inputs.size(),
expected_inputs);
if (totals && !totals_port)
throw Exception("Cannot add transform consuming totals to Pipe because Pipe does not have totals.",
throw Exception("Cannot add transform consuming totals to Pipe because Pipe does not have totals",
ErrorCodes::LOGICAL_ERROR);
if (extremes && !extremes_port)
throw Exception("Cannot add transform consuming extremes to Pipe because it already has extremes.",
throw Exception("Cannot add transform consuming extremes to Pipe because it already has extremes",
ErrorCodes::LOGICAL_ERROR);
if (totals)
@ -561,17 +577,20 @@ void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort *
}
if (totals && !found_totals)
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"specified totals port does not belong to it", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipes because specified totals port does not belong to it",
transform->getName());
if (extremes && !found_extremes)
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"specified extremes port does not belong to it", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipes because specified extremes port does not belong to it",
transform->getName());
auto & outputs = transform->getOutputs();
if (outputs.empty())
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because it has no outputs",
ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform {} to Pipes because it has no outputs", transform->getName());
output_ports.clear();
output_ports.reserve(outputs.size());
@ -614,14 +633,18 @@ void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
if (transform)
{
if (transform->getInputs().size() != 1)
throw Exception("Processor for query pipeline transform should have single input, "
"but " + transform->getName() + " has " +
toString(transform->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Processor for query pipeline transform should have single input, but {} has {} inputs",
transform->getName(),
transform->getInputs().size());
if (transform->getOutputs().size() != 1)
throw Exception("Processor for query pipeline transform should have single output, "
"but " + transform->getName() + " has " +
toString(transform->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Processor for query pipeline transform should have single output, but {} has {} outputs",
transform->getName(),
transform->getOutputs().size());
}
const auto & out_header = transform ? transform->getOutputs().front().getHeader()
@ -661,10 +684,11 @@ void Pipe::addSimpleTransform(const ProcessorGetter & getter)
void Pipe::addChains(std::vector<Chain> chains)
{
if (output_ports.size() != chains.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot add chains to Pipe because "
"number of output ports ({}) is not equal to the number of chains ({})",
output_ports.size(), chains.size());
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add chains to Pipe because number of output ports ({}) is not equal to the number of chains ({})",
output_ports.size(),
chains.size());
dropTotals();
dropExtremes();
@ -702,7 +726,7 @@ void Pipe::addChains(std::vector<Chain> chains)
void Pipe::resize(size_t num_streams, bool force, bool strict)
{
if (output_ports.empty())
throw Exception("Cannot resize an empty Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot resize an empty Pipe", ErrorCodes::LOGICAL_ERROR);
if (!force && num_streams == numOutputPorts())
return;
@ -720,7 +744,7 @@ void Pipe::resize(size_t num_streams, bool force, bool strict)
void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
{
if (output_ports.empty())
throw Exception("Cannot set sink to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot set sink to empty Pipe", ErrorCodes::LOGICAL_ERROR);
auto add_transform = [&](OutputPort *& stream, Pipe::StreamType stream_type)
{
@ -732,14 +756,18 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
if (transform)
{
if (transform->getInputs().size() != 1)
throw Exception("Sink for query pipeline transform should have single input, "
"but " + transform->getName() + " has " +
toString(transform->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Sink for query pipeline transform should have single input, but {} has {} inputs",
transform->getName(),
transform->getInputs().size());
if (!transform->getOutputs().empty())
throw Exception("Sink for query pipeline transform should have no outputs, "
"but " + transform->getName() + " has " +
toString(transform->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Sink for query pipeline transform should have no outputs, but {} has {} outputs",
transform->getName(),
transform->getOutputs().size());
}
if (!transform)
@ -762,7 +790,7 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
void Pipe::transform(const Transformer & transformer)
{
if (output_ports.empty())
throw Exception("Cannot transform empty Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot transform empty Pipe", ErrorCodes::LOGICAL_ERROR);
auto new_processors = transformer(output_ports);
@ -774,8 +802,10 @@ void Pipe::transform(const Transformer & transformer)
for (const auto & port : output_ports)
{
if (!port->isConnected())
throw Exception("Transformation of Pipe is not valid because output port (" +
port->getHeader().dumpStructure() + ") is not connected", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transformation of Pipe is not valid because output port ({})",
port->getHeader().dumpStructure());
set.emplace(&port->getProcessor());
}
@ -787,14 +817,18 @@ void Pipe::transform(const Transformer & transformer)
for (const auto & port : processor->getInputs())
{
if (!port.isConnected())
throw Exception("Transformation of Pipe is not valid because processor " + processor->getName() +
" has not connected input port", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transformation of Pipe is not valid because processor {} has not connected input port",
processor->getName());
const auto * connected_processor = &port.getOutputPort().getProcessor();
if (!set.contains(connected_processor))
throw Exception("Transformation of Pipe is not valid because processor " + processor->getName() +
" has input port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transformation of Pipe is not valid because processor {} has input port which is connected with unknown processor {}",
processor->getName(),
connected_processor->getName());
}
for (auto & port : processor->getOutputs())
@ -807,15 +841,17 @@ void Pipe::transform(const Transformer & transformer)
const auto * connected_processor = &port.getInputPort().getProcessor();
if (!set.contains(connected_processor))
throw Exception("Transformation of Pipe is not valid because processor " + processor->getName() +
" has output port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transformation of Pipe is not valid because processor {} has output port which is connected with unknown processor {}",
processor->getName(),
connected_processor->getName());
}
}
if (output_ports.empty())
throw Exception("Transformation of Pipe is not valid because processors don't have any "
"not-connected output ports", ErrorCodes::LOGICAL_ERROR);
throw Exception(
"Transformation of Pipe is not valid because processors don't have any disconnected output ports", ErrorCodes::LOGICAL_ERROR);
header = output_ports.front()->getHeader();
for (size_t i = 1; i < output_ports.size(); ++i)

View File

@ -18,7 +18,7 @@ struct PipelineResourcesHolder
PipelineResourcesHolder();
PipelineResourcesHolder(PipelineResourcesHolder &&) noexcept;
~PipelineResourcesHolder();
/// Custom mode assignment does not destroy data from lhs. It appends data from rhs to lhs.
/// Custom move assignment does not destroy data from lhs. It appends data from rhs to lhs.
PipelineResourcesHolder& operator=(PipelineResourcesHolder &&) noexcept;
/// Some processors may implicitly use Context or temporary Storage created by Interpreter.

View File

@ -33,7 +33,7 @@ static void checkInput(const InputPort & input, const ProcessorPtr & processor)
if (!input.isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create QueryPipeline because {} has not connected input",
"Cannot create QueryPipeline because {} has disconnected input",
processor->getName());
}
@ -42,7 +42,7 @@ static void checkOutput(const OutputPort & output, const ProcessorPtr & processo
if (!output.isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create QueryPipeline because {} has not connected output",
"Cannot create QueryPipeline because {} has disconnected output",
processor->getName());
}

View File

@ -37,7 +37,7 @@ void QueryPipelineBuilder::addQueryPlan(std::unique_ptr<QueryPlan> plan)
void QueryPipelineBuilder::checkInitialized()
{
if (!initialized())
throw Exception("QueryPipeline wasn't initialized.", ErrorCodes::LOGICAL_ERROR);
throw Exception("QueryPipeline is uninitialized", ErrorCodes::LOGICAL_ERROR);
}
void QueryPipelineBuilder::checkInitializedAndNotCompleted()
@ -45,35 +45,44 @@ void QueryPipelineBuilder::checkInitializedAndNotCompleted()
checkInitialized();
if (pipe.isCompleted())
throw Exception("QueryPipeline was already completed.", ErrorCodes::LOGICAL_ERROR);
throw Exception("QueryPipeline is already completed", ErrorCodes::LOGICAL_ERROR);
}
static void checkSource(const ProcessorPtr & source, bool can_have_totals)
{
if (!source->getInputs().empty())
throw Exception("Source for query pipeline shouldn't have any input, but " + source->getName() + " has " +
toString(source->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Source for query pipeline shouldn't have any input, but {} has {} inputs",
source->getName(),
source->getInputs().size());
if (source->getOutputs().empty())
throw Exception("Source for query pipeline should have single output, but it doesn't have any",
ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Source for query pipeline should have single output, but {} doesn't have any", source->getName());
if (!can_have_totals && source->getOutputs().size() != 1)
throw Exception("Source for query pipeline should have single output, but " + source->getName() + " has " +
toString(source->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Source for query pipeline should have single output, but {} has {} outputs",
source->getName(),
source->getOutputs().size());
if (source->getOutputs().size() > 2)
throw Exception("Source for query pipeline should have 1 or 2 outputs, but " + source->getName() + " has " +
toString(source->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Source for query pipeline should have 1 or 2 output, but {} has {} outputs",
source->getName(),
source->getOutputs().size());
}
void QueryPipelineBuilder::init(Pipe pipe_)
{
if (initialized())
throw Exception("Pipeline has already been initialized.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Pipeline has already been initialized", ErrorCodes::LOGICAL_ERROR);
if (pipe_.empty())
throw Exception("Can't initialize pipeline with empty pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Can't initialize pipeline with empty pipe", ErrorCodes::LOGICAL_ERROR);
pipe = std::move(pipe_);
}
@ -81,10 +90,10 @@ void QueryPipelineBuilder::init(Pipe pipe_)
void QueryPipelineBuilder::init(QueryPipeline pipeline)
{
if (initialized())
throw Exception("Pipeline has already been initialized.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Pipeline has already been initialized", ErrorCodes::LOGICAL_ERROR);
if (pipeline.pushing())
throw Exception("Can't initialize pushing pipeline.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Can't initialize pushing pipeline", ErrorCodes::LOGICAL_ERROR);
pipe.holder = std::move(pipeline.resources);
pipe.processors = std::move(pipeline.processors);
@ -191,11 +200,10 @@ void QueryPipelineBuilder::addTotalsHavingTransform(ProcessorPtr transform)
checkInitializedAndNotCompleted();
if (!typeid_cast<const TotalsHavingTransform *>(transform.get()))
throw Exception("TotalsHavingTransform expected for QueryPipeline::addTotalsHavingTransform.",
ErrorCodes::LOGICAL_ERROR);
throw Exception("TotalsHavingTransform is expected for QueryPipeline::addTotalsHavingTransform", ErrorCodes::LOGICAL_ERROR);
if (pipe.getTotalsPort())
throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Totals having transform was already added to pipeline", ErrorCodes::LOGICAL_ERROR);
resize(1);
@ -208,7 +216,7 @@ void QueryPipelineBuilder::addDefaultTotals()
checkInitializedAndNotCompleted();
if (pipe.getTotalsPort())
throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Totals having transform was already added to pipeline", ErrorCodes::LOGICAL_ERROR);
const auto & current_header = getHeader();
Columns columns;
@ -461,7 +469,7 @@ void QueryPipelineBuilder::setProcessListElement(QueryStatus * elem)
PipelineExecutorPtr QueryPipelineBuilder::execute()
{
if (!isCompleted())
throw Exception("Cannot execute pipeline because it is not completed.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot execute pipeline because it is not completed", ErrorCodes::LOGICAL_ERROR);
return std::make_shared<PipelineExecutor>(pipe.processors, process_list_element);
}

View File

@ -78,8 +78,10 @@ RemoteInserter::RemoteInserter(
/// client's already got this information for remote table. Ignore.
}
else
throw NetException("Unexpected packet from server (expected Data or Exception, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
throw NetException(
ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER,
"Unexpected packet from server (expected Data or Exception, got {})",
Protocol::Server::toString(packet.type));
}
}
@ -131,8 +133,10 @@ void RemoteInserter::onFinish()
// Do nothing
}
else
throw NetException("Unexpected packet from server (expected EndOfStream or Exception, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
throw NetException(
ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER,
"Unexpected packet from server (expected EndOfStream or Exception, got {})",
Protocol::Server::toString(packet.type));
}
finished = true;

View File

@ -356,7 +356,7 @@ std::variant<Block, int> RemoteQueryExecutor::restartQueryWithoutDuplicatedUUIDs
else
return read(*read_context);
}
throw Exception("Found duplicate uuids while processing query.", ErrorCodes::DUPLICATED_PART_UUIDS);
throw Exception("Found duplicate uuids while processing query", ErrorCodes::DUPLICATED_PART_UUIDS);
}
std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
@ -432,8 +432,10 @@ std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
default:
got_unknown_packet_from_replica = true;
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}",
toString(packet.type),
throw Exception(
ErrorCodes::UNKNOWN_PACKET_FROM_SERVER,
"Unknown packet {} from one of the following replicas: {}",
packet.type,
connections->dumpAddresses());
}

View File

@ -12,12 +12,20 @@ bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int too_man
if (overflow_mode == OverflowMode::THROW)
{
if (max_rows && rows > max_rows)
throw Exception("Limit for " + std::string(what) + " exceeded, max rows: " + formatReadableQuantity(max_rows)
+ ", current rows: " + formatReadableQuantity(rows), too_many_rows_exception_code);
throw Exception(
too_many_rows_exception_code,
"Limit for {} exceeded, max rows: {}, current rows: {}",
what,
formatReadableQuantity(max_rows),
formatReadableQuantity(rows));
if (max_bytes && bytes > max_bytes)
throw Exception(fmt::format("Limit for {} exceeded, max bytes: {}, current bytes: {}",
std::string(what), ReadableSize(max_bytes), ReadableSize(bytes)), too_many_bytes_exception_code);
throw Exception(
too_many_bytes_exception_code,
"Limit for {} exceeded, max bytes: {}, current bytes: {}",
what,
ReadableSize(max_bytes),
ReadableSize(bytes));
return true;
}