Merge branch 'master' into to_start_of_five_minutes

This commit is contained in:
mergify[bot] 2022-04-25 17:36:38 +00:00 committed by GitHub
commit d2ac9b2223
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
74 changed files with 908 additions and 522 deletions

View File

@ -92,6 +92,38 @@ jobs:
with:
name: changed_images
path: ${{ runner.temp }}/changed_images.json
StyleCheck:
needs: DockerHubPush
runs-on: [self-hosted, style-checker]
if: ${{ success() || failure() }}
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{ runner.temp }}/style_check
EOF
- name: Download changed images
# even if artifact does not exist, e.g. on `do not test` label or failed Docker job
continue-on-error: true
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ env.TEMP_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Style Check
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 style_check.py
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
DocsCheck:
needs: DockerHubPush
runs-on: [self-hosted, func-tester-aarch64]

View File

@ -211,13 +211,8 @@ function run_tests
if [ -v CHPC_TEST_RUN_BY_HASH_TOTAL ]; then
# filter tests array in bash https://stackoverflow.com/a/40375567
for index in "${!test_files[@]}"; do
# sorry for this, just calculating hash(test_name) % total_tests_group == my_test_group_num
test_hash_result=$(echo test_files[$index] | perl -ne 'use Digest::MD5 qw(md5); print unpack('Q', md5($_)) % $ENV{CHPC_TEST_RUN_BY_HASH_TOTAL} == $ENV{CHPC_TEST_RUN_BY_HASH_NUM};')
# BTW, for some reason when hash(test_name) % total_tests_group != my_test_group_num perl outputs nothing, not zero
if [ "$test_hash_result" != "1" ]; then
# deleting element from array
[ $(( index % CHPC_TEST_RUN_BY_HASH_TOTAL )) != "$CHPC_TEST_RUN_BY_HASH_NUM" ] && \
unset -v 'test_files[$index]'
fi
done
# to have sequential indexes...
test_files=("${test_files[@]}")

View File

@ -1 +0,0 @@
../../../en/sql-reference/data-types/map.md

View File

@ -0,0 +1,107 @@
---
sidebar_position: 65
sidebar_label: Map(key, value)
---
# Map(key, value) {#data_type-map}
`Map(key, value)` 可以存储 `key:value` 键值对类型的数据。
**参数**
- `key` — 键值对的key类型可以是[String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md), [LowCardinality](../../sql-reference/data-types/lowcardinality.md), 或者 [FixedString](../../sql-reference/data-types/fixedstring.md).
- `value` — 键值对的value类型可以是[String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md), [Array](../../sql-reference/data-types/array.md), [LowCardinality](../../sql-reference/data-types/lowcardinality.md), 或者 [FixedString](../../sql-reference/data-types/fixedstring.md).
使用 `a['key']` 可以从 `a Map('key', 'value')` 类型的列中获取到对应的值,这是一个线性复杂度的查询。
**示例**
示例表:
``` sql
CREATE TABLE table_map (a Map(String, UInt64)) ENGINE=Memory;
INSERT INTO table_map VALUES ({'key1':1, 'key2':10}), ({'key1':2,'key2':20}), ({'key1':3,'key2':30});
```
查询 `key2` 的所有值:
```sql
SELECT a['key2'] FROM table_map;
```
查询结果:
```text
┌─arrayElement(a, 'key2')─┐
│ 10 │
│ 20 │
│ 30 │
└─────────────────────────┘
```
如果在 `Map()` 类型的列中,查询的 `key` 值不存在,那么根据 `value` 的类型查询结果将会是数字0空字符串或者空数组。
```sql
INSERT INTO table_map VALUES ({'key3':100}), ({});
SELECT a['key3'] FROM table_map;
```
查询结果:
```text
┌─arrayElement(a, 'key3')─┐
│ 100 │
│ 0 │
└─────────────────────────┘
┌─arrayElement(a, 'key3')─┐
│ 0 │
│ 0 │
│ 0 │
└─────────────────────────┘
```
## 将Tuple类型转换成Map类型 {#map-and-tuple}
您可以使用 [CAST](../../sql-reference/functions/type-conversion-functions.md#type_conversion_function-cast) 方法将 `Tuple()` 转换成 `Map()`
``` sql
SELECT CAST(([1, 2, 3], ['Ready', 'Steady', 'Go']), 'Map(UInt8, String)') AS map;
```
``` text
┌─map───────────────────────────┐
│ {1:'Ready',2:'Steady',3:'Go'} │
└───────────────────────────────┘
```
## Map.keys 和 Map.values 单独使用 {#map-subcolumns}
为了更好使用 `Map` 类型,在一定的场景下,可以单独使用 `keys` 或者 `values`,而不需要将整个列的数据都读取出来。
**示例**
查询:
``` sql
CREATE TABLE t_map (`a` Map(String, UInt64)) ENGINE = Memory;
INSERT INTO t_map VALUES (map('key1', 1, 'key2', 2, 'key3', 3));
SELECT a.keys FROM t_map;
SELECT a.values FROM t_map;
```
结果:
``` text
┌─a.keys─────────────────┐
│ ['key1','key2','key3'] │
└────────────────────────┘
┌─a.values─┐
│ [1,2,3] │
└──────────┘
```
**另请参阅**
- [map()](../../sql-reference/functions/tuple-map-functions.md#function-map) function
- [CAST()](../../sql-reference/functions/type-conversion-functions.md#type_conversion_function-cast) function
[Original article](https://clickhouse.com/docs/zh/sql-reference/data-types/map/) <!--hide-->

View File

@ -1 +0,0 @@
../../../../en/sql-reference/statements/alter/setting.md

View File

@ -0,0 +1,57 @@
---
sidebar_position: 38
sidebar_label: SETTING
---
# 表设置操作 {#table_settings_manipulations}
这是一组更改表设置的操作。你可以修改设置或将其重置为默认值。单个查询可以同时更改多个设置。 如果指定名称的设置不存在,则查询会引发异常。
**语法**
``` sql
ALTER TABLE [db].name [ON CLUSTER cluster] MODIFY|RESET SETTING ...
```
!!! note "注意"
这些查询只能应用于 [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md) 表。
## 修改设置 {#alter_modify_setting}
更改表设置
**语法**
```sql
MODIFY SETTING setting_name=value [, ...]
```
**示例**
```sql
CREATE TABLE example_table (id UInt32, data String) ENGINE=MergeTree() ORDER BY id;
ALTER TABLE example_table MODIFY SETTING max_part_loading_threads=8, max_parts_in_total=50000;
```
## 重置设置 {#alter_reset_setting}
重置表设置为默认值。如果设置处于默认状态,则不采取任何操作。
**语法**
```sql
RESET SETTING setting_name [, ...]
```
**示例**
```sql
CREATE TABLE example_table (id UInt32, data String) ENGINE=MergeTree() ORDER BY id
SETTINGS max_part_loading_threads=8;
ALTER TABLE example_table RESET SETTING max_part_loading_threads;
```
**参见**
- [MergeTree settings](../../../operations/settings/merge-tree-settings.md)

View File

@ -16,6 +16,13 @@
#include <Interpreters/Access/InterpreterGrantQuery.h>
#include <Interpreters/Access/InterpreterShowCreateAccessEntityQuery.h>
#include <Interpreters/Access/InterpreterShowGrantsQuery.h>
#include <Parsers/Access/ASTCreateQuotaQuery.h>
#include <Parsers/Access/ASTCreateRoleQuery.h>
#include <Parsers/Access/ASTCreateRowPolicyQuery.h>
#include <Parsers/Access/ASTCreateSettingsProfileQuery.h>
#include <Parsers/Access/ASTCreateUserQuery.h>
#include <Parsers/Access/ASTGrantQuery.h>
#include <Parsers/ParserAttachAccessEntity.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <boost/range/algorithm/copy.hpp>

View File

@ -1,51 +1,11 @@
#pragma once
#include <Parsers/Access/ASTCreateQuotaQuery.h>
#include <Parsers/Access/ASTCreateRoleQuery.h>
#include <Parsers/Access/ASTCreateRowPolicyQuery.h>
#include <Parsers/Access/ASTCreateSettingsProfileQuery.h>
#include <Parsers/Access/ASTCreateUserQuery.h>
#include <Parsers/Access/ASTGrantQuery.h>
#include <Parsers/Access/ParserCreateQuotaQuery.h>
#include <Parsers/Access/ParserCreateRoleQuery.h>
#include <Parsers/Access/ParserCreateRowPolicyQuery.h>
#include <Parsers/Access/ParserCreateSettingsProfileQuery.h>
#include <Parsers/Access/ParserCreateUserQuery.h>
#include <Parsers/Access/ParserGrantQuery.h>
#include <base/types.h>
#include <memory>
namespace DB
{
/// Special parser for the 'ATTACH access entity' queries.
class ParserAttachAccessEntity : public IParserBase
{
protected:
const char * getName() const override { return "ATTACH access entity query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
{
ParserCreateUserQuery create_user_p;
ParserCreateRoleQuery create_role_p;
ParserCreateRowPolicyQuery create_policy_p;
ParserCreateQuotaQuery create_quota_p;
ParserCreateSettingsProfileQuery create_profile_p;
ParserGrantQuery grant_p;
create_user_p.useAttachMode();
create_role_p.useAttachMode();
create_policy_p.useAttachMode();
create_quota_p.useAttachMode();
create_profile_p.useAttachMode();
grant_p.useAttachMode();
return create_user_p.parse(pos, node, expected) || create_role_p.parse(pos, node, expected)
|| create_policy_p.parse(pos, node, expected) || create_quota_p.parse(pos, node, expected)
|| create_profile_p.parse(pos, node, expected) || grant_p.parse(pos, node, expected);
}
};
struct IAccessEntity;
using AccessEntityPtr = std::shared_ptr<const IAccessEntity>;

View File

@ -514,7 +514,7 @@ XMLDocumentPtr ConfigProcessor::processConfig(
else
{
/// These embedded files added during build with some cmake magic.
/// Look at the end of programs/sever/CMakeLists.txt.
/// Look at the end of programs/server/CMakeLists.txt.
std::string embedded_name;
if (path == "config.xml")
embedded_name = "embedded.xml";

View File

@ -619,6 +619,8 @@
M(648, WRONG_DDL_RENAMING_SETTINGS) \
M(649, INVALID_TRANSACTION) \
M(650, SERIALIZATION_ERROR) \
M(651, CAPN_PROTO_BAD_TYPE) \
M(652, ONLY_NULLS_WHILE_READING_SCHEMA) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -11,6 +11,13 @@ class NetException : public Exception
public:
NetException(const std::string & msg, int code) : Exception(msg, code) {}
// Format message with fmt::format, like the logging functions.
template <typename... Args>
NetException(int code, fmt::format_string<Args...> fmt, Args &&... args)
: Exception(fmt::format(fmt, std::forward<Args>(args)...), code)
{
}
NetException * clone() const override { return new NetException(*this); }
void rethrow() const override { throw *this; }

View File

@ -14,7 +14,7 @@ public:
{
}
size_t totalRanges() const { return static_cast<size_t>(round(static_cast<float>(total_size - from) / range_step)); }
size_t totalRanges() const { return static_cast<size_t>(ceil(static_cast<float>(total_size - from) / range_step)); }
using Range = std::pair<size_t, size_t>;

View File

@ -0,0 +1,22 @@
#include <Common/RangeGenerator.h>
#include <gtest/gtest.h>
using namespace DB;
TEST(RangeGenerator, Common)
{
RangeGenerator g{25, 10};
EXPECT_EQ(g.totalRanges(), 3);
std::vector<RangeGenerator::Range> ranges{{0, 10}, {10, 20}, {20, 25}};
for (size_t i = 0; i < 3; ++i)
{
auto r = g.nextRange();
EXPECT_TRUE(r);
EXPECT_EQ(r, ranges[i]);
}
auto r = g.nextRange();
EXPECT_TRUE(!r);
}

View File

@ -28,7 +28,7 @@ namespace ErrorCodes
extern const int FILE_DOESNT_EXIST;
extern const int UNKNOWN_EXCEPTION;
extern const int INCORRECT_DATA;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int CAPN_PROTO_BAD_TYPE;
}
capnp::StructSchema CapnProtoSchemaParser::getMessageSchema(const FormatSchemaInfo & schema_info)
@ -447,7 +447,7 @@ static DataTypePtr getEnumDataTypeFromEnumSchema(const capnp::EnumSchema & enum_
if (enumerants.size() < 32768)
return getEnumDataTypeFromEnumerants<Int16>(enumerants);
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "ClickHouse supports only 8 and 16-but Enums");
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "ClickHouse supports only 8 and 16-bit Enums");
}
static DataTypePtr getDataTypeFromCapnProtoType(const capnp::Type & capnp_type)
@ -495,17 +495,17 @@ static DataTypePtr getDataTypeFromCapnProtoType(const capnp::Type & capnp_type)
{
auto fields = struct_schema.getUnionFields();
if (fields.size() != 2 || (!fields[0].getType().isVoid() && !fields[1].getType().isVoid()))
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Unions are not supported");
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unions are not supported");
auto value_type = fields[0].getType().isVoid() ? fields[1].getType() : fields[0].getType();
if (value_type.isStruct() || value_type.isList())
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Tuples and Lists cannot be inside Nullable");
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Tuples and Lists cannot be inside Nullable");
auto nested_type = getDataTypeFromCapnProtoType(value_type);
return std::make_shared<DataTypeNullable>(nested_type);
}
if (checkIfStructContainsUnnamedUnion(struct_schema))
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Unnamed union is not supported");
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unnamed union is not supported");
/// Treat Struct as Tuple.
DataTypes nested_types;
@ -518,14 +518,14 @@ static DataTypePtr getDataTypeFromCapnProtoType(const capnp::Type & capnp_type)
return std::make_shared<DataTypeTuple>(std::move(nested_types), std::move(nested_names));
}
default:
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Unsupported CapnProtoType: {}", getCapnProtoFullTypeName(capnp_type));
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unsupported CapnProtoType: {}", getCapnProtoFullTypeName(capnp_type));
}
}
NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema)
{
if (checkIfStructContainsUnnamedUnion(schema))
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Unnamed union is not supported");
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unnamed union is not supported");
NamesAndTypesList names_and_types;
for (auto field : schema.getNonUnionFields())

View File

@ -374,7 +374,7 @@ String FormatFactory::getContentType(
SchemaReaderPtr FormatFactory::getSchemaReader(
const String & name,
ReadBuffer & buf,
ContextPtr context,
ContextPtr & context,
const std::optional<FormatSettings> & _format_settings) const
{
const auto & schema_reader_creator = dict.at(name).schema_reader_creator;
@ -387,7 +387,7 @@ SchemaReaderPtr FormatFactory::getSchemaReader(
ExternalSchemaReaderPtr FormatFactory::getExternalSchemaReader(
const String & name,
ContextPtr context,
ContextPtr & context,
const std::optional<FormatSettings> & _format_settings) const
{
const auto & external_schema_reader_creator = dict.at(name).external_schema_reader_creator;

View File

@ -160,12 +160,12 @@ public:
SchemaReaderPtr getSchemaReader(
const String & name,
ReadBuffer & buf,
ContextPtr context,
ContextPtr & context,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
ExternalSchemaReaderPtr getExternalSchemaReader(
const String & name,
ContextPtr context,
ContextPtr & context,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine);

View File

@ -15,8 +15,10 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int EMPTY_DATA_PASSED;
extern const int BAD_ARGUMENTS;
extern const int ONLY_NULLS_WHILE_READING_SCHEMA;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
static std::optional<NamesAndTypesList> getOrderedColumnsList(
@ -41,11 +43,17 @@ static std::optional<NamesAndTypesList> getOrderedColumnsList(
return res;
}
bool isRetryableSchemaInferenceError(int code)
{
return code == ErrorCodes::EMPTY_DATA_PASSED || code == ErrorCodes::ONLY_NULLS_WHILE_READING_SCHEMA;
}
ColumnsDescription readSchemaFromFormat(
const String & format_name,
const std::optional<FormatSettings> & format_settings,
ReadBufferCreator read_buffer_creator,
ContextPtr context,
ReadBufferIterator & read_buffer_iterator,
bool retry,
ContextPtr & context,
std::unique_ptr<ReadBuffer> & buf_out)
{
NamesAndTypesList names_and_types;
@ -63,20 +71,43 @@ ColumnsDescription readSchemaFromFormat(
}
else if (FormatFactory::instance().checkIfFormatHasSchemaReader(format_name))
{
buf_out = read_buffer_creator();
if (buf_out->eof())
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file, file is empty", format_name);
std::string exception_messages;
SchemaReaderPtr schema_reader;
std::unique_ptr<ReadBuffer> buf;
while ((buf = read_buffer_iterator()))
{
if (buf->eof())
{
auto exception_message = fmt::format("Cannot extract table structure from {} format file, file is emptyg", format_name);
auto schema_reader = FormatFactory::instance().getSchemaReader(format_name, *buf_out, context, format_settings);
try
{
names_and_types = schema_reader->readSchema();
}
catch (const DB::Exception & e)
{
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, e.message());
if (!retry)
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, exception_message);
exception_messages += "\n" + exception_message;
continue;
}
try
{
schema_reader = FormatFactory::instance().getSchemaReader(format_name, *buf, context, format_settings);
names_and_types = schema_reader->readSchema();
buf_out = std::move(buf);
break;
}
catch (...)
{
auto exception_message = getCurrentExceptionMessage(false);
if (!retry || !isRetryableSchemaInferenceError(getCurrentExceptionCode()))
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, exception_message);
exception_messages += "\n" + exception_message;
}
}
if (names_and_types.empty())
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from files failed. Errors:{}", exception_messages);
/// If we have "INSERT SELECT" query then try to order
/// columns as they are ordered in table schema for formats
/// without strict column order (like JSON and TSKV).
@ -99,10 +130,10 @@ ColumnsDescription readSchemaFromFormat(
return ColumnsDescription(names_and_types);
}
ColumnsDescription readSchemaFromFormat(const String & format_name, const std::optional<FormatSettings> & format_settings, ReadBufferCreator read_buffer_creator, ContextPtr context)
ColumnsDescription readSchemaFromFormat(const String & format_name, const std::optional<FormatSettings> & format_settings, ReadBufferIterator & read_buffer_iterator, bool retry, ContextPtr & context)
{
std::unique_ptr<ReadBuffer> buf_out;
return readSchemaFromFormat(format_name, format_settings, read_buffer_creator, context, buf_out);
return readSchemaFromFormat(format_name, format_settings, read_buffer_iterator, retry, context, buf_out);
}
DataTypePtr makeNullableRecursivelyAndCheckForNothing(DataTypePtr type)

View File

@ -6,27 +6,32 @@
namespace DB
{
using ReadBufferIterator = std::function<std::unique_ptr<ReadBuffer>()>;
/// Try to determine the schema of the data in specifying format.
/// For formats that have an external schema reader, it will
/// use it and won't create a read buffer.
/// For formats that have a schema reader from the data,
/// read buffer will be created by the provided creator and
/// the schema will be extracted from the data.
/// If format doesn't have any schema reader or a schema reader
/// couldn't determine the schema, an exception will be thrown.
using ReadBufferCreator = std::function<std::unique_ptr<ReadBuffer>()>;
/// read buffer will be created by the provided iterator and
/// the schema will be extracted from the data. If schema reader
/// couldn't determine the schema we will try the next read buffer
/// from provided iterator if it makes sense. If format doesn't
/// have any schema reader or we couldn't determine the schema,
/// an exception will be thrown.
ColumnsDescription readSchemaFromFormat(
const String & format_name,
const std::optional<FormatSettings> & format_settings,
ReadBufferCreator read_buffer_creator,
ContextPtr context);
ReadBufferIterator & read_buffer_iterator,
bool retry,
ContextPtr & context);
/// If ReadBuffer is created, it will be written to buf_out.
ColumnsDescription readSchemaFromFormat(
const String & format_name,
const std::optional<FormatSettings> & format_settings,
ReadBufferCreator read_buffer_creator,
ContextPtr context,
ReadBufferIterator & read_buffer_iterator,
bool retry,
ContextPtr & context,
std::unique_ptr<ReadBuffer> & buf_out);
/// Make type Nullable recursively:

View File

@ -193,6 +193,7 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type, const ID
if (const auto * ptype = typeid_cast<const DataTypeDecimal<Decimal32> *>(&type)) return convertDecimalType(src, *ptype);
if (const auto * ptype = typeid_cast<const DataTypeDecimal<Decimal64> *>(&type)) return convertDecimalType(src, *ptype);
if (const auto * ptype = typeid_cast<const DataTypeDecimal<Decimal128> *>(&type)) return convertDecimalType(src, *ptype);
if (const auto * ptype = typeid_cast<const DataTypeDecimal<Decimal256> *>(&type)) return convertDecimalType(src, *ptype);
if (which_type.isEnum() && (src.getType() == Field::Types::UInt64 || src.getType() == Field::Types::Int64))
{

View File

@ -0,0 +1,33 @@
#include <Parsers/ParserAttachAccessEntity.h>
#include <Parsers/Access/ParserCreateQuotaQuery.h>
#include <Parsers/Access/ParserCreateRoleQuery.h>
#include <Parsers/Access/ParserCreateRowPolicyQuery.h>
#include <Parsers/Access/ParserCreateSettingsProfileQuery.h>
#include <Parsers/Access/ParserCreateUserQuery.h>
#include <Parsers/Access/ParserGrantQuery.h>
namespace DB
{
bool ParserAttachAccessEntity::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserCreateUserQuery create_user_p;
ParserCreateRoleQuery create_role_p;
ParserCreateRowPolicyQuery create_policy_p;
ParserCreateQuotaQuery create_quota_p;
ParserCreateSettingsProfileQuery create_profile_p;
ParserGrantQuery grant_p;
create_user_p.useAttachMode();
create_role_p.useAttachMode();
create_policy_p.useAttachMode();
create_quota_p.useAttachMode();
create_profile_p.useAttachMode();
grant_p.useAttachMode();
return create_user_p.parse(pos, node, expected) || create_role_p.parse(pos, node, expected)
|| create_policy_p.parse(pos, node, expected) || create_quota_p.parse(pos, node, expected)
|| create_profile_p.parse(pos, node, expected) || grant_p.parse(pos, node, expected);
}
}

View File

@ -0,0 +1,18 @@
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
/// Special parser for the 'ATTACH access entity' queries.
class ParserAttachAccessEntity : public IParserBase
{
protected:
const char * getName() const override { return "ATTACH access entity query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -3,13 +3,15 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/Access/ASTCreateUserQuery.h>
#include <Parsers/Access/ParserCreateUserQuery.h>
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserOptimizeQuery.h>
#include <Parsers/ParserQueryWithOutput.h>
#include <Parsers/ParserAttachAccessEntity.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include "Access/AccessEntityIO.h"
#include <string_view>
#include <regex>
#include <gtest/gtest.h>

View File

@ -34,12 +34,12 @@ ExecutingGraph::Edge & ExecutingGraph::addEdge(Edges & edges, Edge edge, const I
{
auto it = processors_map.find(to);
if (it == processors_map.end())
{
String msg = "Processor " + to->getName() + " was found as " + (edge.backward ? "input" : "output")
+ " for processor " + from->getName() + ", but not found in list of processors.";
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
}
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Processor {} was found as {} for processor {}, but not found in list of processors",
to->getName(),
edge.backward ? "input" : "output",
from->getName());
edge.to = it->second;
auto & added_edge = edges.emplace_back(std::move(edge));
@ -128,8 +128,7 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
{
auto * processor = processors[nodes.size()].get();
if (processors_map.contains(processor))
throw Exception("Processor " + processor->getName() + " was already added to pipeline.",
ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Processor {} was already added to pipeline", processor->getName());
processors_map[processor] = nodes.size();
nodes.emplace_back(std::make_unique<Node>(processor, nodes.size()));

View File

@ -28,6 +28,23 @@ void ExecutorTasks::rethrowFirstThreadException()
executor_context->rethrowExceptionIfHas();
}
void ExecutorTasks::tryWakeUpAnyOtherThreadWithTasks(ExecutionThreadContext & self, std::unique_lock<std::mutex> & lock)
{
if (!task_queue.empty() && !threads_queue.empty() && !finished)
{
size_t next_thread = self.thread_number + 1 == num_threads ? 0 : (self.thread_number + 1);
auto thread_to_wake = task_queue.getAnyThreadWithTasks(next_thread);
if (threads_queue.has(thread_to_wake))
threads_queue.pop(thread_to_wake);
else
thread_to_wake = threads_queue.popAny();
lock.unlock();
executor_contexts[thread_to_wake]->wakeUp();
}
}
void ExecutorTasks::tryGetTask(ExecutionThreadContext & context)
{
{
@ -43,20 +60,7 @@ void ExecutorTasks::tryGetTask(ExecutionThreadContext & context)
if (context.hasTask())
{
if (!task_queue.empty() && !threads_queue.empty())
{
size_t next_thread = context.thread_number + 1 == num_threads ? 0 : (context.thread_number + 1);
auto thread_to_wake = task_queue.getAnyThreadWithTasks(next_thread);
if (threads_queue.has(thread_to_wake))
threads_queue.pop(thread_to_wake);
else
thread_to_wake = threads_queue.popAny();
lock.unlock();
executor_contexts[thread_to_wake]->wakeUp();
}
tryWakeUpAnyOtherThreadWithTasks(context, lock);
return;
}
@ -120,20 +124,7 @@ void ExecutorTasks::pushTasks(Queue & queue, Queue & async_queue, ExecutionThrea
queue.pop();
}
if (!threads_queue.empty() && !task_queue.empty() && !finished)
{
size_t next_thread = context.thread_number + 1 == num_threads ? 0 : (context.thread_number + 1);
auto thread_to_wake = task_queue.getAnyThreadWithTasks(next_thread);
if (threads_queue.has(thread_to_wake))
threads_queue.pop(thread_to_wake);
else
thread_to_wake = threads_queue.popAny();
lock.unlock();
executor_contexts[thread_to_wake]->wakeUp();
}
tryWakeUpAnyOtherThreadWithTasks(context, lock);
}
}

View File

@ -50,6 +50,7 @@ public:
void rethrowFirstThreadException();
void tryWakeUpAnyOtherThreadWithTasks(ExecutionThreadContext & self, std::unique_lock<std::mutex> & lock);
void tryGetTask(ExecutionThreadContext & context);
void pushTasks(Queue & queue, Queue & async_queue, ExecutionThreadContext & context);

View File

@ -185,10 +185,10 @@ void PipelineExecutor::executeSingleThread(size_t thread_num)
auto & context = tasks.getThreadContext(thread_num);
LOG_TRACE(log,
"Thread finished. Total time: {} sec. Execution time: {} sec. Processing time: {} sec. Wait time: {} sec.",
(context.total_time_ns / 1e9),
(context.execution_time_ns / 1e9),
(context.processing_time_ns / 1e9),
(context.wait_time_ns / 1e9));
context.total_time_ns / 1e9,
context.execution_time_ns / 1e9,
context.processing_time_ns / 1e9,
context.wait_time_ns / 1e9);
#endif
}

View File

@ -77,7 +77,7 @@ size_t StreamingFormatExecutor::execute()
case IProcessor::Status::NeedData:
case IProcessor::Status::Async:
case IProcessor::Status::ExpandPipeline:
throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Source processor returned status {}", IProcessor::statusToName(status));
}
}
}

View File

@ -25,7 +25,7 @@ public:
size_t getAnyThreadWithTasks(size_t from_thread = 0)
{
if (num_tasks == 0)
throw Exception("TaskQueue is empty.", ErrorCodes::LOGICAL_ERROR);
throw Exception("TaskQueue is empty", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < queues.size(); ++i)
{
@ -37,7 +37,7 @@ public:
from_thread = 0;
}
throw Exception("TaskQueue is empty.", ErrorCodes::LOGICAL_ERROR);
throw Exception("TaskQueue is empty", ErrorCodes::LOGICAL_ERROR);
}
Task * pop(size_t thread_num)

View File

@ -36,7 +36,7 @@ struct ThreadsQueue
void push(size_t thread)
{
if (unlikely(has(thread)))
throw Exception("Can't push thread because it is already in threads queue.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Can't push thread because it is already in threads queue", ErrorCodes::LOGICAL_ERROR);
swapThreads(thread, stack[stack_size]);
++stack_size;
@ -45,7 +45,7 @@ struct ThreadsQueue
void pop(size_t thread)
{
if (unlikely(!has(thread)))
throw Exception("Can't pop thread because it is not in threads queue.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Can't pop thread because it is not in threads queue", ErrorCodes::LOGICAL_ERROR);
--stack_size;
swapThreads(thread, stack[stack_size]);
@ -54,7 +54,7 @@ struct ThreadsQueue
size_t popAny()
{
if (unlikely(stack_size == 0))
throw Exception("Can't pop from empty queue.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Can't pop from empty queue", ErrorCodes::LOGICAL_ERROR);
--stack_size;
return stack[stack_size];

View File

@ -8,7 +8,10 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int ONLY_NULLS_WHILE_READING_SCHEMA;
extern const int TYPE_MISMATCH;
extern const int INCORRECT_DATA;
extern const int EMPTY_DATA_PASSED;
}
static void chooseResultType(
@ -36,7 +39,7 @@ static void chooseResultType(
type = default_type;
else
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
ErrorCodes::TYPE_MISMATCH,
"Automatically defined type {} for column {} in row {} differs from type defined by previous rows: {}",
type->getName(),
column_name,
@ -51,7 +54,7 @@ static void checkTypeAndAppend(NamesAndTypesList & result, DataTypePtr & type, c
{
if (!default_type)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
ErrorCodes::ONLY_NULLS_WHILE_READING_SCHEMA,
"Cannot determine table structure by first {} rows of data, because some columns contain only Nulls. To increase the maximum "
"number of rows to read for structure determination, use setting input_format_max_rows_to_read_for_schema_inference",
max_rows_to_read);
@ -100,7 +103,7 @@ NamesAndTypesList IRowSchemaReader::readSchema()
break;
if (new_data_types.size() != data_types.size())
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Rows have different amount of values");
throw Exception(ErrorCodes::INCORRECT_DATA, "Rows have different amount of values");
for (size_t i = 0; i != data_types.size(); ++i)
{
@ -114,7 +117,7 @@ NamesAndTypesList IRowSchemaReader::readSchema()
/// Check that we read at list one column.
if (data_types.empty())
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot read rows from the data");
throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "Cannot read rows from the data");
/// If column names weren't set, use default names 'c1', 'c2', ...
if (column_names.empty())
@ -126,7 +129,7 @@ NamesAndTypesList IRowSchemaReader::readSchema()
/// If column names were set, check that the number of names match the number of types.
else if (column_names.size() != data_types.size())
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
ErrorCodes::INCORRECT_DATA,
"The number of column names {} differs with the number of types {}", column_names.size(), data_types.size());
NamesAndTypesList result;
@ -192,7 +195,7 @@ NamesAndTypesList IRowWithNamesSchemaReader::readSchema()
/// Check that we read at list one column.
if (names_to_types.empty())
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot read rows from the data");
throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "Cannot read rows from the data");
NamesAndTypesList result;
for (auto & name : names_order)

View File

@ -11,7 +11,6 @@ namespace DB
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
RowInputFormatWithNamesAndTypes::RowInputFormatWithNamesAndTypes(
@ -319,7 +318,7 @@ NamesAndTypesList FormatWithNamesAndTypesSchemaReader::readSchema()
std::vector<String> data_type_names = format_reader->readTypes();
if (data_type_names.size() != names.size())
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
ErrorCodes::INCORRECT_DATA,
"The number of column names {} differs with the number of types {}", names.size(), data_type_names.size());
NamesAndTypesList result;

View File

@ -67,8 +67,7 @@ void IInflatingTransform::work()
if (can_generate)
{
if (generated)
throw Exception("IInflatingTransform cannot consume chunk because it already was generated",
ErrorCodes::LOGICAL_ERROR);
throw Exception("IInflatingTransform cannot consume chunk because it already was generated", ErrorCodes::LOGICAL_ERROR);
current_chunk = generate();
generated = true;
@ -77,8 +76,7 @@ void IInflatingTransform::work()
else
{
if (!has_input)
throw Exception("IInflatingTransform cannot consume chunk because it wasn't read",
ErrorCodes::LOGICAL_ERROR);
throw Exception("IInflatingTransform cannot consume chunk because it wasn't read", ErrorCodes::LOGICAL_ERROR);
consume(std::move(current_chunk));
has_input = false;

View File

@ -178,7 +178,7 @@ public:
*/
virtual Status prepare()
{
throw Exception("Method 'prepare' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'prepare' is not implemented for {} processor", getName());
}
using PortNumbers = std::vector<UInt64>;
@ -193,7 +193,7 @@ public:
*/
virtual void work()
{
throw Exception("Method 'work' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'work' is not implemented for {} processor", getName());
}
/** Executor must call this method when 'prepare' returned Async.
@ -212,7 +212,7 @@ public:
*/
virtual int schedule()
{
throw Exception("Method 'schedule' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'schedule' is not implemented for {} processor", getName());
}
/** You must call this method if 'prepare' returned ExpandPipeline.
@ -226,7 +226,7 @@ public:
*/
virtual Processors expandPipeline()
{
throw Exception("Method 'expandPipeline' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'expandPipeline' is not implemented for {} processor", getName());
}
/// In case if query was cancelled executor will wait till all processors finish their jobs.
@ -258,7 +258,7 @@ public:
++number;
}
throw Exception("Can't find input port for " + getName() + " processor", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find input port for {} processor", getName());
}
UInt64 getOutputPortNumber(const OutputPort * output_port) const
@ -272,7 +272,7 @@ public:
++number;
}
throw Exception("Can't find output port for " + getName() + " processor", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find output port for {} processor", getName());
}
const auto & getInputs() const { return inputs; }

View File

@ -46,7 +46,7 @@ public:
virtual void transform(Chunk &)
{
throw Exception("Method transform is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method transform is not implemented for {}", getName());
}
Status prepare() override;

View File

@ -19,7 +19,7 @@ LimitTransform::LimitTransform(
, with_ties(with_ties_), description(std::move(description_))
{
if (num_streams != 1 && with_ties)
throw Exception("Cannot use LimitTransform with multiple ports and ties.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot use LimitTransform with multiple ports and ties", ErrorCodes::LOGICAL_ERROR);
ports_data.resize(num_streams);
@ -86,8 +86,7 @@ IProcessor::Status LimitTransform::prepare(
return;
default:
throw Exception(
"Unexpected status for LimitTransform::preparePair : " + IProcessor::statusToName(status),
ErrorCodes::LOGICAL_ERROR);
ErrorCodes::LOGICAL_ERROR, "Unexpected status for LimitTransform::preparePair : {}", IProcessor::statusToName(status));
}
};
@ -126,7 +125,7 @@ IProcessor::Status LimitTransform::prepare(
LimitTransform::Status LimitTransform::prepare()
{
if (ports_data.size() != 1)
throw Exception("prepare without arguments is not supported for multi-port LimitTransform.",
throw Exception("prepare without arguments is not supported for multi-port LimitTransform",
ErrorCodes::LOGICAL_ERROR);
return prepare({0}, {0});

View File

@ -153,7 +153,7 @@ struct RowRef
return true;
}
bool hasEqualSortColumnsWith(const RowRef & other)
bool hasEqualSortColumnsWith(const RowRef & other) const
{
return checkEquals(num_columns, sort_columns, row_num, other.sort_columns, other.row_num);
}
@ -197,7 +197,7 @@ struct RowRefWithOwnedChunk
sort_columns = &owned_chunk->sort_columns;
}
bool hasEqualSortColumnsWith(const RowRefWithOwnedChunk & other)
bool hasEqualSortColumnsWith(const RowRefWithOwnedChunk & other) const
{
return RowRef::checkEquals(sort_columns->size(), sort_columns->data(), row_num,
other.sort_columns->data(), other.row_num);

View File

@ -32,9 +32,7 @@ OffsetTransform::OffsetTransform(
}
IProcessor::Status OffsetTransform::prepare(
const PortNumbers & updated_input_ports,
const PortNumbers & updated_output_ports)
IProcessor::Status OffsetTransform::prepare(const PortNumbers & updated_input_ports, const PortNumbers & updated_output_ports)
{
bool has_full_port = false;
@ -63,9 +61,7 @@ IProcessor::Status OffsetTransform::prepare(
return;
default:
throw Exception(
"Unexpected status for OffsetTransform::preparePair : " + IProcessor::statusToName(status),
ErrorCodes::LOGICAL_ERROR);
ErrorCodes::LOGICAL_ERROR, "Unexpected status for OffsetTransform::preparePair : {}", IProcessor::statusToName(status));
}
};
@ -88,7 +84,7 @@ IProcessor::Status OffsetTransform::prepare(
OffsetTransform::Status OffsetTransform::prepare()
{
if (ports_data.size() != 1)
throw Exception("prepare without arguments is not supported for multi-port OffsetTransform.",
throw Exception("prepare without arguments is not supported for multi-port OffsetTransform",
ErrorCodes::LOGICAL_ERROR);
return prepare({0}, {0});

View File

@ -16,7 +16,7 @@ void connect(OutputPort & output, InputPort & input)
auto out_name = output.getProcessor().getName();
auto in_name = input.getProcessor().getName();
assertCompatibleHeader(output.getHeader(), input.getHeader(), " function connect between " + out_name + " and " + in_name);
assertCompatibleHeader(output.getHeader(), input.getHeader(), fmt::format(" function connect between {} and {}", out_name, in_name));
input.output_port = &output;
output.input_port = &input;

View File

@ -89,7 +89,7 @@ protected:
DataPtr() : data(new Data())
{
if (unlikely((getUInt(data) & FLAGS_MASK) != 0))
throw Exception("Not alignment memory for Port.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Not alignment memory for Port", ErrorCodes::LOGICAL_ERROR);
}
/// Pointer can store flags in case of exception in swap.
~DataPtr() { delete getPtr(getUInt(data) & PTR_MASK); }
@ -133,7 +133,7 @@ protected:
State() : data(new Data())
{
if (unlikely((getUInt(data) & FLAGS_MASK) != 0))
throw Exception("Not alignment memory for Port.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Not alignment memory for Port", ErrorCodes::LOGICAL_ERROR);
}
~State()
@ -160,7 +160,7 @@ protected:
/// throw Exception("Cannot push block to port which is not needed.", ErrorCodes::LOGICAL_ERROR);
if (unlikely(flags & HAS_DATA))
throw Exception("Cannot push block to port which already has data.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot push block to port which already has data", ErrorCodes::LOGICAL_ERROR);
}
void ALWAYS_INLINE pull(DataPtr & data_, std::uintptr_t & flags, bool set_not_needed = false)
@ -174,10 +174,10 @@ protected:
/// It's ok to check because this flag can be changed only by pulling thread.
if (unlikely((flags & IS_NEEDED) == 0) && !set_not_needed)
throw Exception("Cannot pull block from port which is not needed.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot pull block from port which is not needed", ErrorCodes::LOGICAL_ERROR);
if (unlikely((flags & HAS_DATA) == 0))
throw Exception("Cannot pull block from port which has no data.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot pull block from port which has no data", ErrorCodes::LOGICAL_ERROR);
}
std::uintptr_t ALWAYS_INLINE setFlags(std::uintptr_t flags, std::uintptr_t mask)
@ -289,13 +289,15 @@ public:
{
auto & chunk = data->chunk;
String msg = "Invalid number of columns in chunk pulled from OutputPort. Expected "
+ std::to_string(header.columns()) + ", found " + std::to_string(chunk.getNumColumns()) + '\n';
msg += "Header: " + header.dumpStructure() + '\n';
msg += "Chunk: " + chunk.dumpStructure() + '\n';
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Invalid number of columns in chunk pulled from OutputPort. Expected {}, found {}\n"
"Header: {}\n"
"Chunk: {}\n",
header.columns(),
chunk.getNumColumns(),
header.dumpStructure(),
chunk.dumpStructure());
}
return std::move(*data);
@ -403,14 +405,15 @@ public:
{
if (unlikely(!data_.exception && data_.chunk.getNumColumns() != header.columns()))
{
String msg = "Invalid number of columns in chunk pushed to OutputPort. Expected "
+ std::to_string(header.columns())
+ ", found " + std::to_string(data_.chunk.getNumColumns()) + '\n';
msg += "Header: " + header.dumpStructure() + '\n';
msg += "Chunk: " + data_.chunk.dumpStructure() + '\n';
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Invalid number of columns in chunk pushed to OutputPort. Expected {}, found {}\n"
"Header: {}\n"
"Chunk: {}\n",
header.columns(),
data_.chunk.getNumColumns(),
header.dumpStructure(),
data_.chunk.dumpStructure());
}
updateVersion();

View File

@ -64,22 +64,25 @@ void QueryPlan::unitePlans(QueryPlanStepPtr step, std::vector<std::unique_ptr<Qu
const auto & inputs = step->getInputStreams();
size_t num_inputs = step->getInputStreams().size();
if (num_inputs != plans.size())
{
throw Exception("Cannot unite QueryPlans using " + step->getName() +
" because step has different number of inputs. "
"Has " + std::to_string(plans.size()) + " plans "
"and " + std::to_string(num_inputs) + " inputs", ErrorCodes::LOGICAL_ERROR);
}
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot unite QueryPlans using {} because step has different number of inputs. Has {} plans and {} inputs",
step->getName(),
plans.size(),
num_inputs);
for (size_t i = 0; i < num_inputs; ++i)
{
const auto & step_header = inputs[i].header;
const auto & plan_header = plans[i]->getCurrentDataStream().header;
if (!blocksHaveEqualStructure(step_header, plan_header))
throw Exception("Cannot unite QueryPlans using " + step->getName() + " because "
"it has incompatible header with plan " + root->step->getName() + " "
"plan header: " + plan_header.dumpStructure() +
"step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot unite QueryPlans using {} because it has incompatible header with plan {} plan header: {} step header: {}",
step->getName(),
root->step->getName(),
plan_header.dumpStructure(),
step_header.dumpStructure());
}
for (auto & plan : plans)
@ -108,8 +111,10 @@ void QueryPlan::addStep(QueryPlanStepPtr step)
if (num_input_streams == 0)
{
if (isInitialized())
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
"step has no inputs, but QueryPlan is already initialized", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add step {} to QueryPlan because step has no inputs, but QueryPlan is already initialized",
step->getName());
nodes.emplace_back(Node{.step = std::move(step)});
root = &nodes.back();
@ -119,25 +124,33 @@ void QueryPlan::addStep(QueryPlanStepPtr step)
if (num_input_streams == 1)
{
if (!isInitialized())
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
"step has input, but QueryPlan is not initialized", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add step {} to QueryPlan because step has input, but QueryPlan is not initialized",
step->getName());
const auto & root_header = root->step->getOutputStream().header;
const auto & step_header = step->getInputStreams().front().header;
if (!blocksHaveEqualStructure(root_header, step_header))
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
"it has incompatible header with root step " + root->step->getName() + " "
"root header: " + root_header.dumpStructure() +
"step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add step {} to QueryPlan because it has incompatible header with root step {} root header: {} step header: {}",
step->getName(),
root->step->getName(),
root_header.dumpStructure(),
step_header.dumpStructure());
nodes.emplace_back(Node{.step = std::move(step), .children = {root}});
root = &nodes.back();
return;
}
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because it has " +
std::to_string(num_input_streams) + " inputs but " + std::to_string(isInitialized() ? 1 : 0) +
" input expected", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add step {} to QueryPlan because it has {} inputs but {} input expected",
step->getName(),
num_input_streams,
isInitialized() ? 1 : 0);
}
QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(

View File

@ -343,12 +343,12 @@ IProcessor::Status StrictResizeProcessor::prepare(const PortNumbers & updated_in
inputs_with_data.pop();
if (input_with_data.waiting_output == -1)
throw Exception("No associated output for input with data.", ErrorCodes::LOGICAL_ERROR);
throw Exception("No associated output for input with data", ErrorCodes::LOGICAL_ERROR);
auto & waiting_output = output_ports[input_with_data.waiting_output];
if (waiting_output.status == OutputStatus::NotActive)
throw Exception("Invalid status NotActive for associated output.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Invalid status NotActive for associated output", ErrorCodes::LOGICAL_ERROR);
if (waiting_output.status != OutputStatus::Finished)
{

View File

@ -337,8 +337,11 @@ void MySQLSource::initPositionMappingFromQueryResultStructure()
if (!settings->fetch_by_name)
{
if (description.sample_block.columns() != connection->result.getNumFields())
throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "
+ toString(description.sample_block.columns()) + " expected", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
throw Exception(
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH,
"mysqlxx::UseQueryResult contains {} columns while {} expected",
connection->result.getNumFields(),
description.sample_block.columns());
for (const auto idx : collections::range(0, connection->result.getNumFields()))
position_mapping[idx] = idx;
@ -362,18 +365,10 @@ void MySQLSource::initPositionMappingFromQueryResultStructure()
}
if (!missing_names.empty())
{
WriteBufferFromOwnString exception_message;
for (auto iter = missing_names.begin(); iter != missing_names.end(); ++iter)
{
if (iter != missing_names.begin())
exception_message << ", ";
exception_message << *iter;
}
throw Exception("mysqlxx::UseQueryResult must be contain the" + exception_message.str() + " columns.",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
}
throw Exception(
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH,
"mysqlxx::UseQueryResult must contain columns: {}",
fmt::join(missing_names, ", "));
}
}

View File

@ -18,10 +18,7 @@ class SQLiteSource : public SourceWithProgress
using SQLitePtr = std::shared_ptr<sqlite3>;
public:
SQLiteSource(SQLitePtr sqlite_db_,
const String & query_str_,
const Block & sample_block,
UInt64 max_block_size_);
SQLiteSource(SQLitePtr sqlite_db_, const String & query_str_, const Block & sample_block, UInt64 max_block_size_);
String getName() const override { return "SQLite"; }

View File

@ -125,7 +125,7 @@ public:
ssize_t res = ::read(fd, internal_buffer.begin(), internal_buffer.size());
if (-1 == res && errno != EINTR)
throwFromErrno("Cannot read from pipe ", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
throwFromErrno("Cannot read from pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
if (res == 0)
break;
@ -187,7 +187,7 @@ public:
ssize_t res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written);
if ((-1 == res || 0 == res) && errno != EINTR)
throwFromErrno("Cannot write into pipe ", ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR);
throwFromErrno("Cannot write into pipe", ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR);
if (res > 0)
bytes_written += res;

View File

@ -12,23 +12,27 @@ namespace ErrorCodes
static void checkSingleInput(const IProcessor & transform)
{
if (transform.getInputs().size() != 1)
throw Exception("Transform for chain should have single input, "
"but " + transform.getName() + " has " +
toString(transform.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transform for chain should have single input, but {} has {} inputs",
transform.getName(),
transform.getInputs().size());
if (transform.getInputs().front().isConnected())
throw Exception("Transform for chain has connected input.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Transform for chain has connected input", ErrorCodes::LOGICAL_ERROR);
}
static void checkSingleOutput(const IProcessor & transform)
{
if (transform.getOutputs().size() != 1)
throw Exception("Transform for chain should have single output, "
"but " + transform.getName() + " has " +
toString(transform.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transform for chain should have single output, but {} has {} outputs",
transform.getName(),
transform.getOutputs().size());
if (transform.getOutputs().front().isConnected())
throw Exception("Transform for chain has connected input.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Transform for chain has connected output", ErrorCodes::LOGICAL_ERROR);
}
static void checkTransform(const IProcessor & transform)
@ -40,7 +44,7 @@ static void checkTransform(const IProcessor & transform)
static void checkInitialized(const std::list<ProcessorPtr> & processors)
{
if (processors.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Drain is not initialized");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chain is not initialized");
}
Chain::Chain(ProcessorPtr processor)
@ -61,15 +65,17 @@ Chain::Chain(std::list<ProcessorPtr> processors_) : processors(std::move(process
{
for (const auto & input : processor->getInputs())
if (&input != &getInputPort() && !input.isConnected())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot initialize chain because there is a not connected input for {}",
processor->getName());
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot initialize chain because there is a disconnected input for {}",
processor->getName());
for (const auto & output : processor->getOutputs())
if (&output != &getOutputPort() && !output.isConnected())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot initialize chain because there is a not connected output for {}",
processor->getName());
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot initialize chain because there is a disconnected output for {}",
processor->getName());
}
}

View File

@ -3,6 +3,7 @@
#include <Core/BackgroundSchedulePool.h>
#include <Interpreters/Context.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
#include "Core/Protocol.h"
#include <base/logger_useful.h>
@ -17,7 +18,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_PACKET_FROM_SERVER;
extern const int UNEXPECTED_PACKET_FROM_SERVER;
}
std::unique_ptr<ConnectionCollector> ConnectionCollector::connection_collector;
@ -33,7 +34,7 @@ ConnectionCollector & ConnectionCollector::init(ContextMutablePtr global_context
{
if (connection_collector)
{
throw Exception("Connection collector is initialized twice. This is a bug.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Connection collector is initialized twice. This is a bug", ErrorCodes::LOGICAL_ERROR);
}
connection_collector.reset(new ConnectionCollector(global_context_, max_threads));
@ -90,13 +91,13 @@ void ConnectionCollector::drainConnections(IConnections & connections, bool thro
break;
default:
/// Connection should be closed in case of unknown packet,
/// Connection should be closed in case of unexpected packet,
/// since this means that the connection in some bad state.
is_drained = false;
throw Exception(
ErrorCodes::UNKNOWN_PACKET_FROM_SERVER,
"Unknown packet {} from one of the following replicas: {}",
toString(packet.type),
throw NetException(
ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER,
"Unexpected packet {} from one of the following replicas: {}. (expected EndOfStream, Log, ProfileEvents or Exception)",
Protocol::Server::toString(packet.type),
connections.dumpAddresses());
}
}

View File

@ -61,15 +61,19 @@ void ExecutionSpeedLimits::throttle(
{
auto rows_per_second = read_rows / elapsed_seconds;
if (min_execution_rps && rows_per_second < min_execution_rps)
throw Exception("Query is executing too slow: " + toString(read_rows / elapsed_seconds)
+ " rows/sec., minimum: " + toString(min_execution_rps),
ErrorCodes::TOO_SLOW);
throw Exception(
ErrorCodes::TOO_SLOW,
"Query is executing too slow: {} rows/sec., minimum: {}",
read_rows / elapsed_seconds,
min_execution_rps);
auto bytes_per_second = read_bytes / elapsed_seconds;
if (min_execution_bps && bytes_per_second < min_execution_bps)
throw Exception("Query is executing too slow: " + toString(read_bytes / elapsed_seconds)
+ " bytes/sec., minimum: " + toString(min_execution_bps),
ErrorCodes::TOO_SLOW);
throw Exception(
ErrorCodes::TOO_SLOW,
"Query is executing too slow: {} bytes/sec., minimum: {}",
read_bytes / elapsed_seconds,
min_execution_bps);
/// If the predicted execution time is longer than `max_execution_time`.
if (max_execution_time != 0 && total_rows_to_read && read_rows)
@ -77,10 +81,12 @@ void ExecutionSpeedLimits::throttle(
double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows_to_read) / read_rows);
if (estimated_execution_time_seconds > max_execution_time.totalSeconds())
throw Exception("Estimated query execution time (" + toString(estimated_execution_time_seconds) + " seconds)"
+ " is too long. Maximum: " + toString(max_execution_time.totalSeconds())
+ ". Estimated rows to process: " + toString(total_rows_to_read),
ErrorCodes::TOO_SLOW);
throw Exception(
ErrorCodes::TOO_SLOW,
"Estimated query execution time ({} seconds) is too long. Maximum: {}. Estimated rows to process: {}",
estimated_execution_time_seconds,
max_execution_time.totalSeconds(),
total_rows_to_read);
}
if (max_execution_rps && rows_per_second >= max_execution_rps)
@ -92,12 +98,13 @@ void ExecutionSpeedLimits::throttle(
}
}
static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
template <typename... Args>
static bool handleOverflowMode(OverflowMode mode, int code, fmt::format_string<Args...> fmt, Args &&... args)
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
throw Exception(code, std::move(fmt), std::forward<Args>(args)...);
case OverflowMode::BREAK:
return false;
default:
@ -112,10 +119,12 @@ bool ExecutionSpeedLimits::checkTimeLimit(const Stopwatch & stopwatch, OverflowM
auto elapsed_ns = stopwatch.elapsed();
if (elapsed_ns > static_cast<UInt64>(max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(overflow_mode,
"Timeout exceeded: elapsed " + toString(static_cast<double>(elapsed_ns) / 1000000000ULL)
+ " seconds, maximum: " + toString(max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return handleOverflowMode(
overflow_mode,
ErrorCodes::TIMEOUT_EXCEEDED,
"Timeout exceeded: elapsed {} seconds, maximum: {}",
static_cast<double>(elapsed_ns) / 1000000000ULL,
max_execution_time.totalMicroseconds() / 1000000.0);
}
return true;

View File

@ -23,16 +23,18 @@ namespace ErrorCodes
static void checkSource(const IProcessor & source)
{
if (!source.getInputs().empty())
throw Exception("Source for pipe shouldn't have any input, but " + source.getName() + " has " +
toString(source.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Source for pipe shouldn't have any input, but {} has {} inputs",
source.getName(),
source.getInputs().size());
if (source.getOutputs().empty())
throw Exception("Source for pipe should have single output, but it doesn't have any",
ErrorCodes::LOGICAL_ERROR);
if (source.getOutputs().size() > 1)
throw Exception("Source for pipe should have single output, but " + source.getName() + " has " +
toString(source.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
if (source.getOutputs().size() != 1)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Source for pipe should have single output, but {} has {} outputs",
source.getName(),
source.getOutputs().size());
}
static OutputPort * uniteExtremes(const OutputPortRawPtrs & ports, const Block & header, Processors & processors)
@ -112,8 +114,11 @@ PipelineResourcesHolder Pipe::detachResources()
Pipe::Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, OutputPort * extremes)
{
if (!source->getInputs().empty())
throw Exception("Source for pipe shouldn't have any input, but " + source->getName() + " has " +
toString(source->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Source for pipe shouldn't have any input, but {} has {} inputs",
source->getName(),
source->getInputs().size());
if (!output)
throw Exception("Cannot create Pipe from source because specified output port is nullptr",
@ -141,8 +146,7 @@ Pipe::Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, Output
auto it = std::find_if(outputs.begin(), outputs.end(), [port](const OutputPort & p) { return &p == port; });
if (it == outputs.end())
throw Exception("Cannot create Pipe because specified " + name + " port does not belong to source",
ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot create Pipe because specified {} port does not belong to source", name);
};
check_port_from_source(output, "output");
@ -150,9 +154,11 @@ Pipe::Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, Output
check_port_from_source(extremes, "extremes");
if (num_specified_ports != outputs.size())
throw Exception("Cannot create Pipe from source because it has " + std::to_string(outputs.size()) +
" output ports, but " + std::to_string(num_specified_ports) + " were specified",
ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create Pipe from source because it has {} output ports, but {} were specified",
outputs.size(),
num_specified_ports);
}
totals_port = totals;
@ -188,14 +194,16 @@ Pipe::Pipe(Processors processors_) : processors(std::move(processors_))
for (const auto & port : processor->getInputs())
{
if (!port.isConnected())
throw Exception("Cannot create Pipe because processor " + processor->getName() +
" has not connected input port", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Cannot create Pipe because processor {} has disconnected input port", processor->getName());
const auto * connected_processor = &port.getOutputPort().getProcessor();
if (!set.contains(connected_processor))
throw Exception("Cannot create Pipe because processor " + processor->getName() +
" has input port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create Pipe because processor {} has input port which is connected with unknown processor {}",
processor->getName(),
connected_processor->getName());
}
for (auto & port : processor->getOutputs())
@ -208,14 +216,16 @@ Pipe::Pipe(Processors processors_) : processors(std::move(processors_))
const auto * connected_processor = &port.getInputPort().getProcessor();
if (!set.contains(connected_processor))
throw Exception("Cannot create Pipe because processor " + processor->getName() +
" has output port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create Pipe because processor {} has output port which is connected with unknown processor {}",
processor->getName(),
connected_processor->getName());
}
}
if (output_ports.empty())
throw Exception("Cannot create Pipe because processors don't have any not-connected output ports",
throw Exception("Cannot create Pipe because processors don't have any disconnected output ports",
ErrorCodes::LOGICAL_ERROR);
header = output_ports.front()->getHeader();
@ -365,10 +375,10 @@ void Pipe::addSource(ProcessorPtr source)
void Pipe::addTotalsSource(ProcessorPtr source)
{
if (output_ports.empty())
throw Exception("Cannot add totals source to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot add totals source to empty Pipe", ErrorCodes::LOGICAL_ERROR);
if (totals_port)
throw Exception("Totals source was already added to Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Totals source was already added to Pipe", ErrorCodes::LOGICAL_ERROR);
checkSource(*source);
const auto & source_header = output_ports.front()->getHeader();
@ -385,10 +395,10 @@ void Pipe::addTotalsSource(ProcessorPtr source)
void Pipe::addExtremesSource(ProcessorPtr source)
{
if (output_ports.empty())
throw Exception("Cannot add extremes source to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot add extremes source to empty Pipe", ErrorCodes::LOGICAL_ERROR);
if (extremes_port)
throw Exception("Extremes source was already added to Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Extremes source was already added to Pipe", ErrorCodes::LOGICAL_ERROR);
checkSource(*source);
const auto & source_header = output_ports.front()->getHeader();
@ -435,20 +445,23 @@ void Pipe::addTransform(ProcessorPtr transform)
void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes)
{
if (output_ports.empty())
throw Exception("Cannot add transform to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot add transform to empty Pipe", ErrorCodes::LOGICAL_ERROR);
auto & inputs = transform->getInputs();
if (inputs.size() != output_ports.size())
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"Processor has " + std::to_string(inputs.size()) + " input ports, "
"but " + std::to_string(output_ports.size()) + " expected", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipe because it has {} input ports, but {} expected",
transform->getName(),
inputs.size(),
output_ports.size());
if (totals && totals_port)
throw Exception("Cannot add transform with totals to Pipe because it already has totals.",
throw Exception("Cannot add transform with totals to Pipe because it already has totals",
ErrorCodes::LOGICAL_ERROR);
if (extremes && extremes_port)
throw Exception("Cannot add transform with extremes to Pipe because it already has extremes.",
throw Exception("Cannot add transform with extremes to Pipe because it already has extremes",
ErrorCodes::LOGICAL_ERROR);
if (totals)
@ -515,21 +528,24 @@ void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort
void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes)
{
if (output_ports.empty())
throw Exception("Cannot add transform to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot add transform to empty Pipe", ErrorCodes::LOGICAL_ERROR);
auto & inputs = transform->getInputs();
size_t expected_inputs = output_ports.size() + (totals ? 1 : 0) + (extremes ? 1 : 0);
if (inputs.size() != expected_inputs)
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"Processor has " + std::to_string(inputs.size()) + " input ports, "
"but " + std::to_string(expected_inputs) + " expected", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipe because it has {} input ports, but {} expected",
transform->getName(),
inputs.size(),
expected_inputs);
if (totals && !totals_port)
throw Exception("Cannot add transform consuming totals to Pipe because Pipe does not have totals.",
throw Exception("Cannot add transform consuming totals to Pipe because Pipe does not have totals",
ErrorCodes::LOGICAL_ERROR);
if (extremes && !extremes_port)
throw Exception("Cannot add transform consuming extremes to Pipe because it already has extremes.",
throw Exception("Cannot add transform consuming extremes to Pipe because it already has extremes",
ErrorCodes::LOGICAL_ERROR);
if (totals)
@ -561,17 +577,20 @@ void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort *
}
if (totals && !found_totals)
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"specified totals port does not belong to it", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipes because specified totals port does not belong to it",
transform->getName());
if (extremes && !found_extremes)
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"specified extremes port does not belong to it", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipes because specified extremes port does not belong to it",
transform->getName());
auto & outputs = transform->getOutputs();
if (outputs.empty())
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because it has no outputs",
ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform {} to Pipes because it has no outputs", transform->getName());
output_ports.clear();
output_ports.reserve(outputs.size());
@ -614,14 +633,18 @@ void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
if (transform)
{
if (transform->getInputs().size() != 1)
throw Exception("Processor for query pipeline transform should have single input, "
"but " + transform->getName() + " has " +
toString(transform->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Processor for query pipeline transform should have single input, but {} has {} inputs",
transform->getName(),
transform->getInputs().size());
if (transform->getOutputs().size() != 1)
throw Exception("Processor for query pipeline transform should have single output, "
"but " + transform->getName() + " has " +
toString(transform->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Processor for query pipeline transform should have single output, but {} has {} outputs",
transform->getName(),
transform->getOutputs().size());
}
const auto & out_header = transform ? transform->getOutputs().front().getHeader()
@ -661,10 +684,11 @@ void Pipe::addSimpleTransform(const ProcessorGetter & getter)
void Pipe::addChains(std::vector<Chain> chains)
{
if (output_ports.size() != chains.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot add chains to Pipe because "
"number of output ports ({}) is not equal to the number of chains ({})",
output_ports.size(), chains.size());
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add chains to Pipe because number of output ports ({}) is not equal to the number of chains ({})",
output_ports.size(),
chains.size());
dropTotals();
dropExtremes();
@ -702,7 +726,7 @@ void Pipe::addChains(std::vector<Chain> chains)
void Pipe::resize(size_t num_streams, bool force, bool strict)
{
if (output_ports.empty())
throw Exception("Cannot resize an empty Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot resize an empty Pipe", ErrorCodes::LOGICAL_ERROR);
if (!force && num_streams == numOutputPorts())
return;
@ -720,7 +744,7 @@ void Pipe::resize(size_t num_streams, bool force, bool strict)
void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
{
if (output_ports.empty())
throw Exception("Cannot set sink to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot set sink to empty Pipe", ErrorCodes::LOGICAL_ERROR);
auto add_transform = [&](OutputPort *& stream, Pipe::StreamType stream_type)
{
@ -732,14 +756,18 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
if (transform)
{
if (transform->getInputs().size() != 1)
throw Exception("Sink for query pipeline transform should have single input, "
"but " + transform->getName() + " has " +
toString(transform->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Sink for query pipeline transform should have single input, but {} has {} inputs",
transform->getName(),
transform->getInputs().size());
if (!transform->getOutputs().empty())
throw Exception("Sink for query pipeline transform should have no outputs, "
"but " + transform->getName() + " has " +
toString(transform->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Sink for query pipeline transform should have no outputs, but {} has {} outputs",
transform->getName(),
transform->getOutputs().size());
}
if (!transform)
@ -762,7 +790,7 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
void Pipe::transform(const Transformer & transformer)
{
if (output_ports.empty())
throw Exception("Cannot transform empty Pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot transform empty Pipe", ErrorCodes::LOGICAL_ERROR);
auto new_processors = transformer(output_ports);
@ -774,8 +802,10 @@ void Pipe::transform(const Transformer & transformer)
for (const auto & port : output_ports)
{
if (!port->isConnected())
throw Exception("Transformation of Pipe is not valid because output port (" +
port->getHeader().dumpStructure() + ") is not connected", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transformation of Pipe is not valid because output port ({})",
port->getHeader().dumpStructure());
set.emplace(&port->getProcessor());
}
@ -787,14 +817,18 @@ void Pipe::transform(const Transformer & transformer)
for (const auto & port : processor->getInputs())
{
if (!port.isConnected())
throw Exception("Transformation of Pipe is not valid because processor " + processor->getName() +
" has not connected input port", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transformation of Pipe is not valid because processor {} has not connected input port",
processor->getName());
const auto * connected_processor = &port.getOutputPort().getProcessor();
if (!set.contains(connected_processor))
throw Exception("Transformation of Pipe is not valid because processor " + processor->getName() +
" has input port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transformation of Pipe is not valid because processor {} has input port which is connected with unknown processor {}",
processor->getName(),
connected_processor->getName());
}
for (auto & port : processor->getOutputs())
@ -807,15 +841,17 @@ void Pipe::transform(const Transformer & transformer)
const auto * connected_processor = &port.getInputPort().getProcessor();
if (!set.contains(connected_processor))
throw Exception("Transformation of Pipe is not valid because processor " + processor->getName() +
" has output port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transformation of Pipe is not valid because processor {} has output port which is connected with unknown processor {}",
processor->getName(),
connected_processor->getName());
}
}
if (output_ports.empty())
throw Exception("Transformation of Pipe is not valid because processors don't have any "
"not-connected output ports", ErrorCodes::LOGICAL_ERROR);
throw Exception(
"Transformation of Pipe is not valid because processors don't have any disconnected output ports", ErrorCodes::LOGICAL_ERROR);
header = output_ports.front()->getHeader();
for (size_t i = 1; i < output_ports.size(); ++i)

View File

@ -18,7 +18,7 @@ struct PipelineResourcesHolder
PipelineResourcesHolder();
PipelineResourcesHolder(PipelineResourcesHolder &&) noexcept;
~PipelineResourcesHolder();
/// Custom mode assignment does not destroy data from lhs. It appends data from rhs to lhs.
/// Custom move assignment does not destroy data from lhs. It appends data from rhs to lhs.
PipelineResourcesHolder& operator=(PipelineResourcesHolder &&) noexcept;
/// Some processors may implicitly use Context or temporary Storage created by Interpreter.

View File

@ -33,7 +33,7 @@ static void checkInput(const InputPort & input, const ProcessorPtr & processor)
if (!input.isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create QueryPipeline because {} has not connected input",
"Cannot create QueryPipeline because {} has disconnected input",
processor->getName());
}
@ -42,7 +42,7 @@ static void checkOutput(const OutputPort & output, const ProcessorPtr & processo
if (!output.isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create QueryPipeline because {} has not connected output",
"Cannot create QueryPipeline because {} has disconnected output",
processor->getName());
}

View File

@ -37,7 +37,7 @@ void QueryPipelineBuilder::addQueryPlan(std::unique_ptr<QueryPlan> plan)
void QueryPipelineBuilder::checkInitialized()
{
if (!initialized())
throw Exception("QueryPipeline wasn't initialized.", ErrorCodes::LOGICAL_ERROR);
throw Exception("QueryPipeline is uninitialized", ErrorCodes::LOGICAL_ERROR);
}
void QueryPipelineBuilder::checkInitializedAndNotCompleted()
@ -45,35 +45,44 @@ void QueryPipelineBuilder::checkInitializedAndNotCompleted()
checkInitialized();
if (pipe.isCompleted())
throw Exception("QueryPipeline was already completed.", ErrorCodes::LOGICAL_ERROR);
throw Exception("QueryPipeline is already completed", ErrorCodes::LOGICAL_ERROR);
}
static void checkSource(const ProcessorPtr & source, bool can_have_totals)
{
if (!source->getInputs().empty())
throw Exception("Source for query pipeline shouldn't have any input, but " + source->getName() + " has " +
toString(source->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Source for query pipeline shouldn't have any input, but {} has {} inputs",
source->getName(),
source->getInputs().size());
if (source->getOutputs().empty())
throw Exception("Source for query pipeline should have single output, but it doesn't have any",
ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Source for query pipeline should have single output, but {} doesn't have any", source->getName());
if (!can_have_totals && source->getOutputs().size() != 1)
throw Exception("Source for query pipeline should have single output, but " + source->getName() + " has " +
toString(source->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Source for query pipeline should have single output, but {} has {} outputs",
source->getName(),
source->getOutputs().size());
if (source->getOutputs().size() > 2)
throw Exception("Source for query pipeline should have 1 or 2 outputs, but " + source->getName() + " has " +
toString(source->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Source for query pipeline should have 1 or 2 output, but {} has {} outputs",
source->getName(),
source->getOutputs().size());
}
void QueryPipelineBuilder::init(Pipe pipe_)
{
if (initialized())
throw Exception("Pipeline has already been initialized.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Pipeline has already been initialized", ErrorCodes::LOGICAL_ERROR);
if (pipe_.empty())
throw Exception("Can't initialize pipeline with empty pipe.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Can't initialize pipeline with empty pipe", ErrorCodes::LOGICAL_ERROR);
pipe = std::move(pipe_);
}
@ -81,10 +90,10 @@ void QueryPipelineBuilder::init(Pipe pipe_)
void QueryPipelineBuilder::init(QueryPipeline pipeline)
{
if (initialized())
throw Exception("Pipeline has already been initialized.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Pipeline has already been initialized", ErrorCodes::LOGICAL_ERROR);
if (pipeline.pushing())
throw Exception("Can't initialize pushing pipeline.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Can't initialize pushing pipeline", ErrorCodes::LOGICAL_ERROR);
pipe.holder = std::move(pipeline.resources);
pipe.processors = std::move(pipeline.processors);
@ -191,11 +200,10 @@ void QueryPipelineBuilder::addTotalsHavingTransform(ProcessorPtr transform)
checkInitializedAndNotCompleted();
if (!typeid_cast<const TotalsHavingTransform *>(transform.get()))
throw Exception("TotalsHavingTransform expected for QueryPipeline::addTotalsHavingTransform.",
ErrorCodes::LOGICAL_ERROR);
throw Exception("TotalsHavingTransform is expected for QueryPipeline::addTotalsHavingTransform", ErrorCodes::LOGICAL_ERROR);
if (pipe.getTotalsPort())
throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Totals having transform was already added to pipeline", ErrorCodes::LOGICAL_ERROR);
resize(1);
@ -208,7 +216,7 @@ void QueryPipelineBuilder::addDefaultTotals()
checkInitializedAndNotCompleted();
if (pipe.getTotalsPort())
throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Totals having transform was already added to pipeline", ErrorCodes::LOGICAL_ERROR);
const auto & current_header = getHeader();
Columns columns;
@ -461,7 +469,7 @@ void QueryPipelineBuilder::setProcessListElement(QueryStatus * elem)
PipelineExecutorPtr QueryPipelineBuilder::execute()
{
if (!isCompleted())
throw Exception("Cannot execute pipeline because it is not completed.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot execute pipeline because it is not completed", ErrorCodes::LOGICAL_ERROR);
return std::make_shared<PipelineExecutor>(pipe.processors, process_list_element);
}

View File

@ -78,8 +78,10 @@ RemoteInserter::RemoteInserter(
/// client's already got this information for remote table. Ignore.
}
else
throw NetException("Unexpected packet from server (expected Data or Exception, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
throw NetException(
ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER,
"Unexpected packet from server (expected Data or Exception, got {})",
Protocol::Server::toString(packet.type));
}
}
@ -131,8 +133,10 @@ void RemoteInserter::onFinish()
// Do nothing
}
else
throw NetException("Unexpected packet from server (expected EndOfStream or Exception, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
throw NetException(
ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER,
"Unexpected packet from server (expected EndOfStream or Exception, got {})",
Protocol::Server::toString(packet.type));
}
finished = true;

View File

@ -356,7 +356,7 @@ std::variant<Block, int> RemoteQueryExecutor::restartQueryWithoutDuplicatedUUIDs
else
return read(*read_context);
}
throw Exception("Found duplicate uuids while processing query.", ErrorCodes::DUPLICATED_PART_UUIDS);
throw Exception("Found duplicate uuids while processing query", ErrorCodes::DUPLICATED_PART_UUIDS);
}
std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
@ -432,8 +432,10 @@ std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
default:
got_unknown_packet_from_replica = true;
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}",
toString(packet.type),
throw Exception(
ErrorCodes::UNKNOWN_PACKET_FROM_SERVER,
"Unknown packet {} from one of the following replicas: {}",
packet.type,
connections->dumpAddresses());
}

View File

@ -12,12 +12,20 @@ bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int too_man
if (overflow_mode == OverflowMode::THROW)
{
if (max_rows && rows > max_rows)
throw Exception("Limit for " + std::string(what) + " exceeded, max rows: " + formatReadableQuantity(max_rows)
+ ", current rows: " + formatReadableQuantity(rows), too_many_rows_exception_code);
throw Exception(
too_many_rows_exception_code,
"Limit for {} exceeded, max rows: {}, current rows: {}",
what,
formatReadableQuantity(max_rows),
formatReadableQuantity(rows));
if (max_bytes && bytes > max_bytes)
throw Exception(fmt::format("Limit for {} exceeded, max bytes: {}, current bytes: {}",
std::string(what), ReadableSize(max_bytes), ReadableSize(bytes)), too_many_bytes_exception_code);
throw Exception(
too_many_bytes_exception_code,
"Limit for {} exceeded, max bytes: {}, current bytes: {}",
what,
ReadableSize(max_bytes),
ReadableSize(bytes));
return true;
}

View File

@ -1,8 +1,9 @@
#include <QueryPipeline/narrowPipe.h>
#include <random>
#include <Common/thread_local_rng.h>
#include <Processors/ConcatProcessor.h>
#include <QueryPipeline/Pipe.h>
#include "narrowBlockInputStreams.h"
namespace DB

View File

@ -9,7 +9,7 @@ namespace DB
class Pipe;
/** If the number of sources of `inputs` is greater than `width`,
* then glues the sources to each other (using ConcatBlockInputStream),
* then glues the sources to each other (using ConcatProcessor),
* so that the number of sources becomes no more than `width`.
*
* Trying to glue the sources with each other uniformly randomly.

View File

@ -182,41 +182,22 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
{
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri);
auto paths = getPathsList(path_from_uri, uri, ctx);
if (paths.empty() && !FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format))
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files in HDFS with provided path. You must "
"specify table structure manually",
format);
std::string exception_messages;
bool read_buffer_creator_was_used = false;
for (const auto & path : paths)
ReadBufferIterator read_buffer_iterator = [&, uri_without_path = uri_without_path, it = paths.begin()]() mutable -> std::unique_ptr<ReadBuffer>
{
auto read_buffer_creator = [&, uri_without_path = uri_without_path]()
{
read_buffer_creator_was_used = true;
if (paths.empty())
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files in HDFS with provided path. You must "
"specify table structure manually",
format);
auto compression = chooseCompressionMethod(path, compression_method);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromHDFS>(uri_without_path, path, ctx->getGlobalContext()->getConfigRef()), compression);
};
try
{
return readSchemaFromFormat(format, std::nullopt, read_buffer_creator, ctx);
}
catch (...)
{
if (paths.size() == 1 || !read_buffer_creator_was_used)
throw;
exception_messages += getCurrentExceptionMessage(false) + "\n";
}
}
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from hdfs files failed. Errors:\n{}", exception_messages);
if (it == paths.end())
return nullptr;
auto compression = chooseCompressionMethod(*it, compression_method);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromHDFS>(uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef()), compression);
};
return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx);
}
class HDFSSource::DisclosedGlobIterator::Impl

View File

@ -11,7 +11,7 @@
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/getTableExpressions.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <QueryPipeline/narrowBlockInputStreams.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/RemoteQueryExecutor.h>

View File

@ -5256,6 +5256,10 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
if (select_query->join())
return std::nullopt;
// INTERPOLATE expressions may include aliases, so aliases should be preserved
if (select_query->interpolate() && !select_query->interpolate()->children.empty())
return std::nullopt;
auto query_options = SelectQueryOptions(
QueryProcessingStage::WithMergeableState,
/* depth */ 1,

View File

@ -237,7 +237,7 @@ ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr c
/// in case of file descriptor we have a stream of data and we cannot
/// start reading data from the beginning after reading some data for
/// schema inference.
auto read_buffer_creator = [&]()
ReadBufferIterator read_buffer_iterator = [&]()
{
/// We will use PeekableReadBuffer to create a checkpoint, so we need a place
/// where we can store the original read buffer.
@ -247,7 +247,7 @@ ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr c
return read_buf;
};
auto columns = readSchemaFromFormat(format_name, format_settings, read_buffer_creator, context, peekable_read_buffer_from_fd);
auto columns = readSchemaFromFormat(format_name, format_settings, read_buffer_iterator, false, context, peekable_read_buffer_from_fd);
if (peekable_read_buffer_from_fd)
{
/// If we have created read buffer in readSchemaFromFormat we should rollback to checkpoint.
@ -274,38 +274,22 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
return ColumnsDescription(source->getOutputs().front().getHeader().getNamesAndTypesList());
}
std::string exception_messages;
bool read_buffer_creator_was_used = false;
for (const auto & path : paths)
if (paths.empty() && !FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format))
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files with provided path. You must specify "
"table structure manually",
format);
ReadBufferIterator read_buffer_iterator = [&, it = paths.begin()]() mutable -> std::unique_ptr<ReadBuffer>
{
auto read_buffer_creator = [&]()
{
read_buffer_creator_was_used = true;
if (it == paths.end())
return nullptr;
if (!std::filesystem::exists(path))
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files with provided path. You must specify "
"table structure manually",
format);
return createReadBuffer(*it++, false, "File", -1, compression_method, context);
};
return createReadBuffer(path, false, "File", -1, compression_method, context);
};
try
{
return readSchemaFromFormat(format, format_settings, read_buffer_creator, context);
}
catch (...)
{
if (paths.size() == 1 || !read_buffer_creator_was_used)
throw;
exception_messages += getCurrentExceptionMessage(false) + "\n";
}
}
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from files failed. Errors:\n{}", exception_messages);
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context);
}
bool StorageFile::isColumnOriented() const

View File

@ -1,4 +1,4 @@
#include <QueryPipeline/narrowBlockInputStreams.h>
#include <QueryPipeline/narrowPipe.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Storages/StorageMerge.h>
#include <Storages/StorageFactory.h>

View File

@ -36,7 +36,7 @@
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h>
#include <QueryPipeline/narrowBlockInputStreams.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
@ -946,45 +946,31 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx)
{
std::vector<String> keys = {s3_configuration.uri.key};
auto file_iterator = createFileIterator(s3_configuration, keys, is_key_with_globs, distributed_processing, ctx);
auto file_iterator = createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx);
std::string current_key;
std::string exception_messages;
bool read_buffer_creator_was_used = false;
do
ReadBufferIterator read_buffer_iterator = [&, first = false]() mutable -> std::unique_ptr<ReadBuffer>
{
current_key = (*file_iterator)();
auto read_buffer_creator = [&]()
auto key = (*file_iterator)();
if (key.empty())
{
read_buffer_creator_was_used = true;
if (current_key.empty())
if (first)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files with provided path in S3. You must specify "
"table structure manually",
format);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(
s3_configuration.client, s3_configuration.uri.bucket, current_key, s3_configuration.rw_settings.max_single_read_retries, ctx->getReadSettings()),
chooseCompressionMethod(current_key, compression_method));
};
try
{
return readSchemaFromFormat(format, format_settings, read_buffer_creator, ctx);
return nullptr;
}
catch (...)
{
if (!is_key_with_globs || !read_buffer_creator_was_used)
throw;
exception_messages += getCurrentExceptionMessage(false) + "\n";
}
} while (!current_key.empty());
first = false;
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(
s3_configuration.client, s3_configuration.uri.bucket, key, s3_configuration.rw_settings.max_single_read_retries, ctx->getReadSettings()),
chooseCompressionMethod(key, compression_method));
};
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from s3 files failed. Errors:\n{}", exception_messages);
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx);
}

View File

@ -21,7 +21,7 @@
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/getTableExpressions.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <QueryPipeline/narrowBlockInputStreams.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/Pipe.h>
#include "Processors/Sources/SourceWithProgress.h"
#include <Processors/Sources/RemoteSource.h>

View File

@ -43,7 +43,6 @@ namespace ErrorCodes
extern const int NETWORK_ERROR;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
@ -551,49 +550,31 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
urls_to_check = {uri};
}
String exception_messages;
bool read_buffer_creator_was_used = false;
std::vector<String>::const_iterator option = urls_to_check.begin();
do
ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin()]() mutable -> std::unique_ptr<ReadBuffer>
{
auto read_buffer_creator = [&]()
{
read_buffer_creator_was_used = true;
return StorageURLSource::getFirstAvailableURLReadBuffer(
option,
urls_to_check.end(),
context,
{},
Poco::Net::HTTPRequest::HTTP_GET,
{},
ConnectionTimeouts::getHTTPTimeouts(context),
compression_method,
credentials,
headers,
false,
false,
context->getSettingsRef().max_download_threads);
};
if (it == urls_to_check.cend())
return nullptr;
try
{
return readSchemaFromFormat(format, format_settings, read_buffer_creator, context);
}
catch (...)
{
if (urls_to_check.size() == 1 || !read_buffer_creator_was_used)
throw;
auto buf = StorageURLSource::getFirstAvailableURLReadBuffer(
it,
urls_to_check.cend(),
context,
{},
Poco::Net::HTTPRequest::HTTP_GET,
{},
ConnectionTimeouts::getHTTPTimeouts(context),
compression_method,
credentials,
headers,
false,
false,
context->getSettingsRef().max_download_threads);\
++it;
return buf;
};
exception_messages += getCurrentExceptionMessage(false) + "\n";
}
} while (++option < urls_to_check.end());
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"All attempts to extract table structure from urls failed. Errors:\n{}",
exception_messages);
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context);
}
bool IStorageURLBase::isColumnOriented() const

View File

@ -49,12 +49,11 @@ void TableFunctionFormat::parseArguments(const ASTPtr & ast_function, ContextPtr
ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context) const
{
auto read_buffer_creator = [&]()
ReadBufferIterator read_buffer_iterator = [&]()
{
return std::make_unique<ReadBufferFromString>(data);
};
return readSchemaFromFormat(format, std::nullopt, read_buffer_creator, context);
return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, false, context);
}
Block TableFunctionFormat::parseData(ColumnsDescription columns, ContextPtr context) const

View File

@ -541,6 +541,30 @@ def test_schema_inference_with_globs(started_cluster):
)
assert sorted(result.split()) == ["0", "\\N"]
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/data3.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL"
)
filename = "data{1,3}.jsoncompacteachrow"
result = node1.query_and_get_error(f"desc hdfs('hdfs://hdfs1:9000/{filename}')")
assert "All attempts to extract table structure from files failed" in result
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/data0.jsoncompacteachrow', 'TSV', 'x String') select '[123;]'"
)
url_filename = "data{0,1,2,3}.jsoncompacteachrow"
result = node1.query_and_get_error(
f"desc hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow')"
)
assert (
"Cannot extract table structure from JSONCompactEachRow format file" in result
)
def test_insert_select_schema_inference(started_cluster):
node1.query(

View File

@ -697,6 +697,11 @@ def test_abrupt_connection_loss_while_heavy_replication(started_cluster):
def test_abrupt_server_restart_while_heavy_replication(started_cluster):
# FIXME (kssenii) temporary disabled
if instance.is_built_with_address_sanitizer():
pytest.skip("Temporary disabled (FIXME)")
conn = get_postgres_conn(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,

View File

@ -1325,6 +1325,46 @@ def test_schema_inference_from_globs(started_cluster):
)
assert sorted(result.split()) == ["0", "\\N"]
instance.query(
f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test3.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL"
)
url_filename = "test{1,3}.jsoncompacteachrow"
result = instance.query_and_get_error(
f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}')"
)
assert "All attempts to extract table structure from files failed" in result
result = instance.query_and_get_error(
f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}')"
)
assert "All attempts to extract table structure from files failed" in result
instance.query(
f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test0.jsoncompacteachrow', 'TSV', 'x String') select '[123;]'"
)
result = instance.query_and_get_error(
f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test*.jsoncompacteachrow')"
)
assert (
"Cannot extract table structure from JSONCompactEachRow format file" in result
)
url_filename = "test{0,1,2,3}.jsoncompacteachrow"
result = instance.query_and_get_error(
f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}')"
)
assert (
"Cannot extract table structure from JSONCompactEachRow format file" in result
)
def test_signatures(started_cluster):
bucket = started_cluster.minio_bucket

View File

@ -31,7 +31,7 @@
<query>
INSERT INTO FUNCTION file('test_file', '{format}', 'key UInt64, value UInt64')
SELECT number, number FROM numbers(1000000)
SELECT number, number FROM numbers(10000000)
</query>
<query>
@ -39,16 +39,18 @@
SELECT number, number, number, number, number, number FROM numbers(1000000)
</query>
<!-- Disabled in 36538, broken now
<query>
INSERT INTO FUNCTION file('test_file_{{_partition_id}}', '{format}', 'partition_id UInt64, value UInt64')
PARTITION BY partition_id
SELECT (number % {partitions_count}) as partition_id, number FROM numbers(1000000)
SELECT (number % {partitions_count}) as partition_id, number FROM numbers(50000)
</query>
<query>
INSERT INTO FUNCTION file('test_file_{{_partition_id}}', '{format}', 'partition_id UInt64, value1 UInt64, value2 UInt64, value3 UInt64, value4 UInt64, value5 UInt64')
PARTITION BY partition_id
SELECT (number % {partitions_count}) as partition_id, number, number, number, number, number FROM numbers(1000000)
SELECT (number % {partitions_count}) as partition_id, number, number, number, number, number FROM numbers(50000)
</query>
-->
</test>

View File

@ -238,3 +238,9 @@ original 7
10.5 \N
11 \N
11.5 \N
1 1
2 2
3 2
4 3
5 4
6 3

View File

@ -70,3 +70,10 @@ SELECT n, source, inter + NULL AS inter_p FROM (
SELECT n, source, inter AS inter_p FROM (
SELECT toFloat32(number % 10) AS n, 'original' AS source, number + NULL AS inter FROM numbers(10) WHERE (number % 3) = 1
) ORDER BY n ASC WITH FILL FROM 0 TO 11.51 STEP 0.5 INTERPOLATE ( inter_p AS inter_p + 1 );
# Test INTERPOLATE for MergeTree
DROP TABLE IF EXISTS t_inter_02233;
CREATE TABLE t_inter_02233 (n Int32) ENGINE = MergeTree ORDER BY n;
INSERT INTO t_inter_02233 VALUES (1),(3),(3),(6),(6),(6);
SELECT n, count() AS m FROM t_inter_02233 GROUP BY n ORDER BY n WITH FILL INTERPOLATE ( m AS m + 1 );
DROP TABLE IF EXISTS t_inter_02233;

View File

@ -0,0 +1,2 @@
1
\N

View File

@ -0,0 +1,10 @@
-- Tags: no-fasttest, no-parallel
insert into function file('02267_data2.jsonl') select NULL as x;
insert into function file('02267_data3.jsonl') select * from numbers(0);
insert into function file('02267_data4.jsonl') select 1 as x;
select * from file('02267_data*.jsonl') order by x;
insert into function file('02267_data1.jsonl', 'TSV') select 1 as x;
insert into function file('02267_data1.jsonl', 'TSV') select [1,2,3] as x;
select * from file('02267_data*.jsonl'); --{serverError CANNOT_EXTRACT_TABLE_STRUCTURE}

View File

@ -1,3 +1,4 @@
-- Tags: no-backward-compatibility-check:22.4.2
DROP TABLE IF EXISTS github_events;
CREATE TABLE github_events

View File

@ -0,0 +1,4 @@
1
2
3
4

View File

@ -0,0 +1,4 @@
SELECT * FROM VALUES('x Decimal32(0)', (1));
SELECT * FROM VALUES('x Decimal64(0)', (2));
SELECT * FROM VALUES('x Decimal128(0)', (3));
SELECT * FROM VALUES('x Decimal256(0)', (4));