Merge branch 'master' into startup-sanity-checks

This commit is contained in:
alesapin 2022-03-31 00:08:43 +02:00
commit 81ce991494
106 changed files with 1326 additions and 1666 deletions

View File

@ -149,7 +149,6 @@ jobs:
sudo rm -fr "$TEMP_PATH"
SplitBuildSmokeTest:
needs: [BuilderDebSplitted]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, style-checker]
steps:
- name: Set envs
@ -316,7 +315,6 @@ jobs:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinRelease:
needs: [DockerHubPush]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Set envs
@ -362,7 +360,6 @@ jobs:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinGCC:
needs: [DockerHubPush]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Set envs
@ -636,7 +633,6 @@ jobs:
##########################################################################################
BuilderDebSplitted:
needs: [DockerHubPush]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Set envs
@ -682,7 +678,6 @@ jobs:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinTidy:
needs: [DockerHubPush]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Set envs
@ -728,7 +723,6 @@ jobs:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinDarwin:
needs: [DockerHubPush]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Set envs
@ -774,7 +768,6 @@ jobs:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinAarch64:
needs: [DockerHubPush]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Set envs
@ -820,7 +813,6 @@ jobs:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinFreeBSD:
needs: [DockerHubPush]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Set envs
@ -866,7 +858,6 @@ jobs:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinDarwinAarch64:
needs: [DockerHubPush]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Set envs
@ -912,7 +903,6 @@ jobs:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinPPC64:
needs: [DockerHubPush]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Set envs

View File

@ -1,9 +1,12 @@
if (APPLE OR NOT ARCH_AMD64 OR SANITIZE STREQUAL "undefined")
set (ENABLE_EMBEDDED_COMPILER_DEFAULT OFF)
# During cross-compilation in our CI we have to use llvm-tblgen and other building tools
# tools to be build for host architecture and everything else for target architecture (e.g. AArch64)
# Possible workaround is to use llvm-tblgen from some package...
# But lets just enable LLVM for native builds
if (CMAKE_CROSSCOMPILING OR SANITIZE STREQUAL "undefined")
set (ENABLE_EMBEDDED_COMPILER_DEFAULT OFF)
else()
set (ENABLE_EMBEDDED_COMPILER_DEFAULT ON)
set (ENABLE_EMBEDDED_COMPILER_DEFAULT ON)
endif()
option (ENABLE_EMBEDDED_COMPILER "Enable support for 'compile_expressions' option for query execution" ${ENABLE_EMBEDDED_COMPILER_DEFAULT})
if (NOT ENABLE_EMBEDDED_COMPILER)

View File

@ -1378,7 +1378,7 @@ $REF_SHA $SHA_TO_TEST $(numactl --hardware | sed -n 's/^available:[[:space:]]\+/
EOF
# Also insert some data about the check into the CI checks table.
"${client[@]}" --query "INSERT INTO "'"'"gh-data"'"'".checks FORMAT TSVWithNamesAndTypes" \
"${client[@]}" --query "INSERT INTO "'"'"default"'"'".checks FORMAT TSVWithNamesAndTypes" \
< ci-checks.tsv
set -x

View File

@ -8,7 +8,7 @@ toc_title: "版本折叠MergeTree"
这个引擎:
- 允许快速写入不断变化的对象状态。
- 删除后台中的旧对象状态。 这显降低了存储体积。
- 删除后台中的旧对象状态。 这显降低了存储体积。
请参阅部分 [崩溃](#table_engines_versionedcollapsingmergetree) 有关详细信息。

View File

@ -184,6 +184,11 @@ void LocalServer::tryInitPath()
if (path.back() != '/')
path += '/';
fs::create_directories(fs::path(path) / "user_defined/");
fs::create_directories(fs::path(path) / "data/");
fs::create_directories(fs::path(path) / "metadata/");
fs::create_directories(fs::path(path) / "metadata_dropped/");
global_context->setPath(path);
global_context->setTemporaryStorage(path + "tmp");
@ -565,7 +570,6 @@ void LocalServer::processConfig()
/// Lock path directory before read
status.emplace(fs::path(path) / "status", StatusFile::write_full_info);
fs::create_directories(fs::path(path) / "user_defined/");
LOG_DEBUG(log, "Loading user defined objects from {}", path);
Poco::File(path + "user_defined/").createDirectories();
UserDefinedSQLObjectsLoader::instance().loadObjects(global_context);
@ -573,9 +577,6 @@ void LocalServer::processConfig()
LOG_DEBUG(log, "Loaded user defined objects.");
LOG_DEBUG(log, "Loading metadata from {}", path);
fs::create_directories(fs::path(path) / "data/");
fs::create_directories(fs::path(path) / "metadata/");
loadMetadataSystem(global_context);
attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));

View File

@ -35,10 +35,10 @@ public:
{}
// Format message with fmt::format, like the logging functions.
template <typename ...Args>
Exception(int code, const std::string & fmt, Args&&... args)
: Exception(fmt::format(fmt::runtime(fmt), std::forward<Args>(args)...), code)
{}
template <typename... Args>
Exception(int code, fmt::format_string<Args...> fmt, Args &&... args) : Exception(fmt::format(fmt, std::forward<Args>(args)...), code)
{
}
struct CreateFromPocoTag {};
struct CreateFromSTDTag {};
@ -52,10 +52,10 @@ public:
const char * what() const throw() override { return message().data(); }
/// Add something to the existing message.
template <typename ...Args>
void addMessage(const std::string& format, Args&&... args)
template <typename... Args>
void addMessage(fmt::format_string<Args...> format, Args &&... args)
{
extendedMessage(fmt::format(fmt::runtime(format), std::forward<Args>(args)...));
extendedMessage(fmt::format(format, std::forward<Args>(args)...));
}
void addMessage(const std::string& message)
@ -117,10 +117,10 @@ public:
ParsingException(int code, const std::string & message);
// Format message with fmt::format, like the logging functions.
template <typename ...Args>
ParsingException(int code, const std::string & fmt, Args&&... args)
: Exception(fmt::format(fmt::runtime(fmt), std::forward<Args>(args)...), code)
{}
template <typename... Args>
ParsingException(int code, fmt::format_string<Args...> fmt, Args &&... args) : Exception(code, fmt, std::forward<Args>(args)...)
{
}
std::string displayText() const

View File

@ -0,0 +1,46 @@
#pragma once
#include <optional>
#include <cmath>
namespace DB
{
class RangeGenerator
{
public:
explicit RangeGenerator(size_t total_size_, size_t range_step_, size_t range_start = 0)
: from(range_start), range_step(range_step_), total_size(total_size_)
{
}
size_t totalRanges() const { return static_cast<size_t>(round(static_cast<float>(total_size - from) / range_step)); }
using Range = std::pair<size_t, size_t>;
// return upper exclusive range of values, i.e. [from_range, to_range>
std::optional<Range> nextRange()
{
if (from >= total_size)
{
return std::nullopt;
}
auto to = from + range_step;
if (to >= total_size)
{
to = total_size;
}
Range range{from, to};
from = to;
return range;
}
private:
size_t from;
size_t range_step;
size_t total_size;
};
}

178
src/Common/format.h Normal file
View File

@ -0,0 +1,178 @@
#pragma once
#include <base/types.h>
#include <Common/Exception.h>
#include <Common/PODArray.h>
#include <Common/StringUtils/StringUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace Format
{
using IndexPositions = PODArrayWithStackMemory<UInt64, 64>;
static inline void parseNumber(const String & description, UInt64 l, UInt64 r, UInt64 & res, UInt64 argument_number)
{
res = 0;
for (UInt64 pos = l; pos < r; ++pos)
{
if (!isNumericASCII(description[pos]))
throw Exception("Not a number in curly braces at position " + std::to_string(pos), ErrorCodes::BAD_ARGUMENTS);
res = res * 10 + description[pos] - '0';
if (res >= argument_number)
throw Exception(
"Too big number for arguments, must be at most " + std::to_string(argument_number - 1), ErrorCodes::BAD_ARGUMENTS);
}
}
static inline void init(
const String & pattern,
size_t argument_number,
const std::vector<std::optional<String>> & constant_strings,
IndexPositions & index_positions,
std::vector<String> & substrings)
{
/// Is current position after open curly brace.
bool is_open_curly = false;
/// The position of last open token.
size_t last_open = -1;
/// Is formatting in a plain {} token.
std::optional<bool> is_plain_numbering;
UInt64 index_if_plain = 0;
/// Left position of adding substrings, just to the closed brace position or the start of the string.
/// Invariant --- the start of substring is in this position.
size_t start_pos = 0;
/// A flag to decide whether we should glue the constant strings.
bool glue_to_next = false;
/// Handling double braces (escaping).
auto double_brace_removal = [](String & str)
{
size_t i = 0;
bool should_delete = true;
str.erase(
std::remove_if(
str.begin(),
str.end(),
[&i, &should_delete, &str](char)
{
bool is_double_brace = (str[i] == '{' && str[i + 1] == '{') || (str[i] == '}' && str[i + 1] == '}');
++i;
if (is_double_brace && should_delete)
{
should_delete = false;
return true;
}
should_delete = true;
return false;
}),
str.end());
};
index_positions.emplace_back();
for (size_t i = 0; i < pattern.size(); ++i)
{
if (pattern[i] == '{')
{
/// Escaping handling
/// It is safe to access because of null termination
if (pattern[i + 1] == '{')
{
++i;
continue;
}
if (is_open_curly)
throw Exception("Two open curly braces without close one at position " + std::to_string(i), ErrorCodes::BAD_ARGUMENTS);
String to_add = String(pattern.data() + start_pos, i - start_pos);
double_brace_removal(to_add);
if (!glue_to_next)
substrings.emplace_back(to_add);
else
substrings.back() += to_add;
glue_to_next = false;
is_open_curly = true;
last_open = i + 1;
}
else if (pattern[i] == '}')
{
if (pattern[i + 1] == '}')
{
++i;
continue;
}
if (!is_open_curly)
throw Exception("Closed curly brace without open one at position " + std::to_string(i), ErrorCodes::BAD_ARGUMENTS);
is_open_curly = false;
if (last_open == i)
{
if (is_plain_numbering && !*is_plain_numbering)
throw Exception(
"Cannot switch from automatic field numbering to manual field specification", ErrorCodes::BAD_ARGUMENTS);
is_plain_numbering = true;
if (index_if_plain >= argument_number)
throw Exception("Argument is too big for formatting", ErrorCodes::BAD_ARGUMENTS);
index_positions.back() = index_if_plain++;
}
else
{
if (is_plain_numbering && *is_plain_numbering)
throw Exception(
"Cannot switch from automatic field numbering to manual field specification", ErrorCodes::BAD_ARGUMENTS);
is_plain_numbering = false;
UInt64 arg;
parseNumber(pattern, last_open, i, arg, argument_number);
if (arg >= argument_number)
throw Exception(
"Argument is too big for formatting. Note that indexing starts from zero", ErrorCodes::BAD_ARGUMENTS);
index_positions.back() = arg;
}
if (!constant_strings.empty() && constant_strings[index_positions.back()])
{
/// The next string should be glued to last `A {} C`.format('B') -> `A B C`.
glue_to_next = true;
substrings.back() += *constant_strings[index_positions.back()];
}
else
index_positions.emplace_back(); /// Otherwise we commit arg number and proceed.
start_pos = i + 1;
}
}
if (is_open_curly)
throw Exception("Last open curly brace is not closed", ErrorCodes::BAD_ARGUMENTS);
String to_add = String(pattern.data() + start_pos, pattern.size() - start_pos);
double_brace_removal(to_add);
if (!glue_to_next)
substrings.emplace_back(to_add);
else
substrings.back() += to_add;
index_positions.pop_back();
}
}
}

View File

@ -6,6 +6,7 @@
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/ApplyWithSubqueryVisitor.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h>
@ -55,6 +56,9 @@ std::pair<String, StoragePtr> createTableFromAST(
ast_create_query.attach = true;
ast_create_query.setDatabase(database_name);
if (ast_create_query.select && ast_create_query.isView())
ApplyWithSubqueryVisitor().visit(*ast_create_query.select);
if (ast_create_query.as_table_function)
{
const auto & factory = TableFunctionFactory::instance();

View File

@ -179,8 +179,12 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr
if (!task->was_executed)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Entry {} was executed, but was not committed: code {}: {}",
task->execution_status.code, task->execution_status.message);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Entry {} was executed, but was not committed: code {}: {}",
task->entry_name,
task->execution_status.code,
task->execution_status.message);
}
try_node->setAlreadyRemoved();

View File

@ -50,7 +50,7 @@ namespace
{
if (!qualified_name.database.empty())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Dictionary source of type {} specifies a schema but schema is not supported by {}-driver",
"Dictionary source specifies a schema but schema is not supported by {}-driver",
bridge_.getName());
}

View File

@ -392,8 +392,13 @@ void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment)
if (bytes_to_predownload)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Failed to predownload remaining {} bytes. Current file segment: {}, current download offset: {}, expected: {}, eof: {}",
file_segment->range().toString(), file_segment->getDownloadOffset(), file_offset_of_buffer_end, implementation_buffer->eof());
"Failed to predownload remaining {} bytes. Current file segment: {}, current download offset: {}, expected: {}, "
"eof: {}",
bytes_to_predownload,
file_segment->range().toString(),
file_segment->getDownloadOffset(),
file_offset_of_buffer_end,
implementation_buffer->eof());
auto result = implementation_buffer->hasPendingData();

View File

@ -44,7 +44,7 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
{
return std::make_unique<ReadBufferFromS3>(
client_ptr, bucket, fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries,
settings, /* use_external_buffer */true, read_until_position, /* restricted_seek */true);
settings, /* use_external_buffer */true, /* offset */ 0, read_until_position, /* restricted_seek */true);
};
if (with_cache)

View File

@ -85,9 +85,12 @@ FormatSchemaInfo::FormatSchemaInfo(const String & format_schema, const String &
else if (path.has_parent_path() && !fs::weakly_canonical(default_schema_directory_path / path).string().starts_with(fs::weakly_canonical(default_schema_directory_path).string()))
{
if (is_server)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Path in the 'format_schema' setting shouldn't go outside the 'format_schema_path' directory: {} ({} not in {})",
path.string());
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path in the 'format_schema' setting shouldn't go outside the 'format_schema_path' directory: {} ({} not in {})",
default_schema_directory(),
path.string(),
default_schema_directory());
path = default_schema_directory_path / path;
schema_path = path.filename();
schema_directory = path.parent_path() / "";

View File

@ -9,6 +9,7 @@
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeObject.h>
#include <Common/JSONParsers/SimdJSONParser.h>
#include <Common/JSONParsers/RapidJSONParser.h>
#include <Common/JSONParsers/DummyJSONParser.h>
@ -158,22 +159,37 @@ DataTypePtr getDataTypeFromJSONFieldImpl(const Element & field)
{
auto object = field.getObject();
DataTypePtr value_type;
bool is_object = false;
for (const auto key_value_pair : object)
{
auto type = getDataTypeFromJSONFieldImpl(key_value_pair.second);
if (!type)
return nullptr;
continue;
if (value_type && value_type->getName() != type->getName())
return nullptr;
if (isObject(type))
{
is_object = true;
break;
}
value_type = type;
if (!value_type)
{
value_type = type;
}
else if (!value_type->equals(*type))
{
is_object = true;
break;
}
}
if (!value_type)
return nullptr;
if (is_object)
return std::make_shared<DataTypeObject>("json", false);
return std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), value_type);
if (value_type)
return std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), value_type);
return nullptr;
}
throw Exception{ErrorCodes::INCORRECT_DATA, "Unexpected JSON type"};

View File

@ -7,6 +7,8 @@
#include <Formats/ReadSchemaUtils.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Common/assert_cast.h>
#include <Interpreters/Context.h>
#include <Storages/IStorage.h>
namespace DB
{
@ -17,6 +19,28 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
static std::optional<NamesAndTypesList> getOrderedColumnsList(
const NamesAndTypesList & columns_list, const Names & columns_order_hint)
{
if (columns_list.size() != columns_order_hint.size())
return {};
std::unordered_map<String, DataTypePtr> available_columns;
for (const auto & [name, type] : columns_list)
available_columns.emplace(name, type);
NamesAndTypesList res;
for (const auto & name : columns_order_hint)
{
auto it = available_columns.find(name);
if (it == available_columns.end())
return {};
res.emplace_back(name, it->second);
}
return res;
}
ColumnsDescription readSchemaFromFormat(
const String & format_name,
const std::optional<FormatSettings> & format_settings,
@ -52,6 +76,22 @@ ColumnsDescription readSchemaFromFormat(
{
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, e.message());
}
/// If we have "INSERT SELECT" query then try to order
/// columns as they are ordered in table schema for formats
/// without strict column order (like JSON and TSKV).
/// It will allow to execute simple data loading with query
/// "INSERT INTO table SELECT * FROM ..."
const auto & insertion_table = context->getInsertionTable();
if (!schema_reader->hasStrictOrderOfColumns() && !insertion_table.empty())
{
auto storage = DatabaseCatalog::instance().getTable(insertion_table, context);
auto metadata = storage->getInMemoryMetadataPtr();
auto names_in_storage = metadata->getColumns().getNamesOfPhysical();
auto ordered_list = getOrderedColumnsList(names_and_types, names_in_storage);
if (ordered_list)
names_and_types = *ordered_list;
}
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{} file format doesn't support schema inference", format_name);

View File

@ -53,6 +53,7 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnLowCardinality.h>
#include <Interpreters/Context.h>
#include <Common/HashTable/HashMap.h>
namespace DB
@ -3140,52 +3141,138 @@ private:
}
}
WrapperType createTupleToObjectWrapper(const DataTypeTuple & from_tuple, bool has_nullable_subcolumns) const
{
if (!from_tuple.haveExplicitNames())
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Cast to Object can be performed only from flatten Named Tuple. Got: {}", from_tuple.getName());
PathsInData paths;
DataTypes from_types;
std::tie(paths, from_types) = flattenTuple(from_tuple.getPtr());
auto to_types = from_types;
for (auto & type : to_types)
{
if (isTuple(type) || isNested(type))
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Cast to Object can be performed only from flatten Named Tuple. Got: {}",
from_tuple.getName());
type = recursiveRemoveLowCardinality(type);
}
return [element_wrappers = getElementWrappers(from_types, to_types),
has_nullable_subcolumns, from_types, to_types, paths]
(ColumnsWithTypeAndName & arguments, const DataTypePtr &, const ColumnNullable * nullable_source, size_t input_rows_count)
{
size_t tuple_size = to_types.size();
auto flattened_column = flattenTuple(arguments.front().column);
const auto & column_tuple = assert_cast<const ColumnTuple &>(*flattened_column);
if (tuple_size != column_tuple.getColumns().size())
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Expected tuple with {} subcolumn, but got {} subcolumns",
tuple_size, column_tuple.getColumns().size());
auto res = ColumnObject::create(has_nullable_subcolumns);
for (size_t i = 0; i < tuple_size; ++i)
{
ColumnsWithTypeAndName element = {{column_tuple.getColumns()[i], from_types[i], "" }};
auto converted_column = element_wrappers[i](element, to_types[i], nullable_source, input_rows_count);
res->addSubcolumn(paths[i], converted_column->assumeMutable());
}
return res;
};
}
WrapperType createMapToObjectWrapper(const DataTypeMap & from_map, bool has_nullable_subcolumns) const
{
auto key_value_types = from_map.getKeyValueTypes();
if (!isStringOrFixedString(key_value_types[0]))
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Cast to Object from Map can be performed only from Map "
"with String or FixedString key. Got: {}", from_map.getName());
const auto & value_type = key_value_types[1];
auto to_value_type = value_type;
if (!has_nullable_subcolumns && value_type->isNullable())
to_value_type = removeNullable(value_type);
if (has_nullable_subcolumns && !value_type->isNullable())
to_value_type = makeNullable(value_type);
DataTypes to_key_value_types{std::make_shared<DataTypeString>(), std::move(to_value_type)};
auto element_wrappers = getElementWrappers(key_value_types, to_key_value_types);
return [has_nullable_subcolumns, element_wrappers, key_value_types, to_key_value_types]
(ColumnsWithTypeAndName & arguments, const DataTypePtr &, const ColumnNullable * nullable_source, size_t) -> ColumnPtr
{
const auto & column_map = assert_cast<const ColumnMap &>(*arguments.front().column);
const auto & offsets = column_map.getNestedColumn().getOffsets();
auto key_value_columns = column_map.getNestedData().getColumnsCopy();
for (size_t i = 0; i < 2; ++i)
{
ColumnsWithTypeAndName element{{key_value_columns[i], key_value_types[i], ""}};
key_value_columns[i] = element_wrappers[i](element, to_key_value_types[i], nullable_source, key_value_columns[i]->size());
}
const auto & key_column_str = assert_cast<const ColumnString &>(*key_value_columns[0]);
const auto & value_column = *key_value_columns[1];
using SubcolumnsMap = HashMap<StringRef, MutableColumnPtr, StringRefHash>;
SubcolumnsMap subcolumns;
for (size_t row = 0; row < offsets.size(); ++row)
{
for (size_t i = offsets[static_cast<ssize_t>(row) - 1]; i < offsets[row]; ++i)
{
auto ref = key_column_str.getDataAt(i);
bool inserted;
SubcolumnsMap::LookupResult it;
subcolumns.emplace(ref, it, inserted);
auto & subcolumn = it->getMapped();
if (inserted)
subcolumn = value_column.cloneEmpty()->cloneResized(row);
/// Map can have duplicated keys. We insert only first one.
if (subcolumn->size() == row)
subcolumn->insertFrom(value_column, i);
}
/// Insert default values for keys missed in current row.
for (const auto & [_, subcolumn] : subcolumns)
if (subcolumn->size() == row)
subcolumn->insertDefault();
}
auto column_object = ColumnObject::create(has_nullable_subcolumns);
for (auto && [key, subcolumn] : subcolumns)
{
PathInData path(key.toView());
column_object->addSubcolumn(path, std::move(subcolumn));
}
return column_object;
};
}
WrapperType createObjectWrapper(const DataTypePtr & from_type, const DataTypeObject * to_type) const
{
if (const auto * from_tuple = checkAndGetDataType<DataTypeTuple>(from_type.get()))
{
if (!from_tuple->haveExplicitNames())
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Cast to Object can be performed only from flatten Named Tuple. Got: {}", from_type->getName());
PathsInData paths;
DataTypes from_types;
std::tie(paths, from_types) = flattenTuple(from_type);
auto to_types = from_types;
for (auto & type : to_types)
{
if (isTuple(type) || isNested(type))
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Cast to Object can be performed only from flatten Named Tuple. Got: {}", from_type->getName());
type = recursiveRemoveLowCardinality(type);
}
return [element_wrappers = getElementWrappers(from_types, to_types),
has_nullable_subcolumns = to_type->hasNullableSubcolumns(), from_types, to_types, paths]
(ColumnsWithTypeAndName & arguments, const DataTypePtr &, const ColumnNullable * nullable_source, size_t input_rows_count)
{
size_t tuple_size = to_types.size();
auto flattened_column = flattenTuple(arguments.front().column);
const auto & column_tuple = assert_cast<const ColumnTuple &>(*flattened_column);
if (tuple_size != column_tuple.getColumns().size())
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Expected tuple with {} subcolumn, but got {} subcolumns",
tuple_size, column_tuple.getColumns().size());
auto res = ColumnObject::create(has_nullable_subcolumns);
for (size_t i = 0; i < tuple_size; ++i)
{
ColumnsWithTypeAndName element = {{column_tuple.getColumns()[i], from_types[i], "" }};
auto converted_column = element_wrappers[i](element, to_types[i], nullable_source, input_rows_count);
res->addSubcolumn(paths[i], converted_column->assumeMutable());
}
return res;
};
return createTupleToObjectWrapper(*from_tuple, to_type->hasNullableSubcolumns());
}
else if (const auto * from_map = checkAndGetDataType<DataTypeMap>(from_type.get()))
{
return createMapToObjectWrapper(*from_map, to_type->hasNullableSubcolumns());
}
else if (checkAndGetDataType<DataTypeString>(from_type.get()))
{
@ -3199,7 +3286,7 @@ private:
}
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Cast to Object can be performed only from flatten named tuple or string. Got: {}", from_type->getName());
"Cast to Object can be performed only from flatten named Tuple, Map or String. Got: {}", from_type->getName());
}
template <typename FieldType>

View File

@ -259,7 +259,7 @@ public:
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Function '{}' needs at least 2 arguments, at most 3 arguments; passed {}.",
arguments.size());
name, arguments.size());
if (!isString(arguments[0]))
throw Exception("Illegal type " + arguments[0]->getName() + " of first argument of function " + getName() + ". Must be String.",

View File

@ -181,9 +181,12 @@ ColumnPtr IExecutableFunction::defaultImplementationForNulls(
// Default implementation for nulls returns null result for null arguments,
// so the result type must be nullable.
if (!result_type->isNullable())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Function {} with Null argument and default implementation for Nulls "
"is expected to return Nullable result, got {}", result_type->getName());
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Function {} with Null argument and default implementation for Nulls "
"is expected to return Nullable result, got {}",
getName(),
result_type->getName());
return result_type->createColumnConstWithDefaultValue(input_rows_count);
}

View File

@ -231,7 +231,7 @@ private:
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Function {} decimal scale should have native UInt type. Actual {}",
scale_argument.type->getName());
getName(), scale_argument.type->getName());
}
scale = arguments[additional_argument_index].column->getUInt(0);

View File

@ -52,23 +52,21 @@ public:
{
if (arguments.size() < 2)
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size())
+ ", should be at least 2.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (arguments.size() > FormatImpl::argument_threshold)
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size())
+ ", should be at most " + std::to_string(FormatImpl::argument_threshold),
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, should be at least 2",
getName(),
arguments.size());
for (const auto arg_idx : collections::range(0, arguments.size()))
{
const auto * arg = arguments[arg_idx].get();
if (!isStringOrFixedString(arg))
throw Exception{"Illegal type " + arg->getName() + " of argument " + std::to_string(arg_idx + 1) + " of function "
+ getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}",
arg->getName(),
arg_idx + 1,
getName());
}
return std::make_shared<DataTypeString>();
@ -125,7 +123,7 @@ private:
std::vector<const ColumnString::Chars *> data(num_arguments);
std::vector<const ColumnString::Offsets *> offsets(num_arguments);
std::vector<size_t> fixed_string_sizes(num_arguments);
std::vector<String> constant_strings(num_arguments);
std::vector<std::optional<String>> constant_strings(num_arguments);
bool has_column_string = false;
bool has_column_fixed_string = false;
for (size_t i = 0; i < num_arguments; ++i)

View File

@ -112,7 +112,7 @@ public:
|| (res = executeType<DataTypeDateTime64>(arguments, result_type))))
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of function {], must be Date or DateTime.",
"Illegal column {} of function {}, must be Date or DateTime.",
arguments[1].column->getName(),
getName());

View File

@ -0,0 +1,68 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/ObjectUtils.h>
#include <Columns/ColumnTuple.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
{
class FunctionFlattenTuple : public IFunction
{
public:
static constexpr auto name = "flattenTuple";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionFlattenTuple>(); }
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo &) const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
const auto & type = arguments[0];
const auto * type_tuple = checkAndGetDataType<DataTypeTuple>(type.get());
if (!type_tuple || !type_tuple->haveExplicitNames())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Argument for function '{}' must be Named Tuple. Got '{}'",
getName(), type->getName());
auto [paths, types] = flattenTuple(type);
Names names;
names.reserve(paths.size());
for (const auto & path : paths)
names.push_back(path.getPath());
return std::make_shared<DataTypeTuple>(types, names);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
auto column = arguments.at(0).column;
if (!checkAndGetColumn<ColumnTuple>(column.get()))
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of first argument of function {}. Expected ColumnTuple",
column->getName(), getName());
return flattenTuple(column);
}
};
}
void registerFunctionFlattenTuple(FunctionFactory & factory)
{
factory.registerFunction<FunctionFlattenTuple>();
}
}

View File

@ -45,25 +45,23 @@ public:
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.empty())
if (arguments.size() < 2)
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size())
+ ", should be at least 1",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (arguments.size() > FormatImpl::argument_threshold)
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size())
+ ", should be at most " + std::to_string(FormatImpl::argument_threshold),
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, should be at least 2",
getName(),
arguments.size());
for (const auto arg_idx : collections::range(0, arguments.size()))
{
const auto * arg = arguments[arg_idx].get();
if (!isStringOrFixedString(arg))
throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(arg_idx + 1) + " of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}",
arg->getName(),
arg_idx + 1,
getName());
}
return std::make_shared<DataTypeString>();
@ -84,7 +82,7 @@ public:
std::vector<const ColumnString::Chars *> data(arguments.size() - 1);
std::vector<const ColumnString::Offsets *> offsets(arguments.size() - 1);
std::vector<size_t> fixed_string_sizes(arguments.size() - 1);
std::vector<String> constant_strings(arguments.size() - 1);
std::vector<std::optional<String>> constant_strings(arguments.size() - 1);
bool has_column_string = false;
bool has_column_fixed_string = false;

View File

@ -4,8 +4,10 @@
#include <base/types.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/format.h>
#include <Common/memcpySmall.h>
#include <algorithm>
#include <optional>
#include <string>
@ -15,15 +17,9 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
struct FormatImpl
{
static constexpr size_t small_argument_threshold = 1024;
static constexpr size_t argument_threshold = std::numeric_limits<UInt32>::max();
static constexpr size_t right_padding = 15;
template <typename... Args>
@ -39,165 +35,10 @@ struct FormatImpl
format<false, false>(std::forward<Args>(args)...);
}
static void parseNumber(const String & description, UInt64 l, UInt64 r, UInt64 & res)
{
res = 0;
for (UInt64 pos = l; pos < r; ++pos)
{
if (!isNumericASCII(description[pos]))
throw Exception("Not a number in curly braces at position " + std::to_string(pos), ErrorCodes::BAD_ARGUMENTS);
res = res * 10 + description[pos] - '0';
if (res >= argument_threshold)
throw Exception(
"Too big number for arguments, must be at most " + std::to_string(argument_threshold), ErrorCodes::BAD_ARGUMENTS);
}
}
static inline void init(
const String & pattern,
const std::vector<const ColumnString::Chars *> & data,
size_t argument_number,
const std::vector<String> & constant_strings,
UInt64 * index_positions_ptr,
std::vector<String> & substrings)
{
/// Is current position after open curly brace.
bool is_open_curly = false;
/// The position of last open token.
size_t last_open = -1;
/// Is formatting in a plain {} token.
std::optional<bool> is_plain_numbering;
UInt64 index_if_plain = 0;
/// Left position of adding substrings, just to the closed brace position or the start of the string.
/// Invariant --- the start of substring is in this position.
size_t start_pos = 0;
/// A flag to decide whether we should glue the constant strings.
bool glue_to_next = false;
/// Handling double braces (escaping).
auto double_brace_removal = [](String & str)
{
size_t i = 0;
bool should_delete = true;
str.erase(
std::remove_if(
str.begin(),
str.end(),
[&i, &should_delete, &str](char)
{
bool is_double_brace = (str[i] == '{' && str[i + 1] == '{') || (str[i] == '}' && str[i + 1] == '}');
++i;
if (is_double_brace && should_delete)
{
should_delete = false;
return true;
}
should_delete = true;
return false;
}),
str.end());
};
for (size_t i = 0; i < pattern.size(); ++i)
{
if (pattern[i] == '{')
{
/// Escaping handling
/// It is safe to access because of null termination
if (pattern[i + 1] == '{')
{
++i;
continue;
}
if (is_open_curly)
throw Exception("Two open curly braces without close one at position " + std::to_string(i), ErrorCodes::BAD_ARGUMENTS);
String to_add = String(pattern.data() + start_pos, i - start_pos);
double_brace_removal(to_add);
if (!glue_to_next)
substrings.emplace_back(to_add);
else
substrings.back() += to_add;
glue_to_next = false;
is_open_curly = true;
last_open = i + 1;
}
else if (pattern[i] == '}')
{
if (pattern[i + 1] == '}')
{
++i;
continue;
}
if (!is_open_curly)
throw Exception("Closed curly brace without open one at position " + std::to_string(i), ErrorCodes::BAD_ARGUMENTS);
is_open_curly = false;
if (last_open == i)
{
if (is_plain_numbering && !*is_plain_numbering)
throw Exception(
"Cannot switch from automatic field numbering to manual field specification", ErrorCodes::BAD_ARGUMENTS);
is_plain_numbering = true;
if (index_if_plain >= argument_number)
throw Exception("Argument is too big for formatting", ErrorCodes::BAD_ARGUMENTS);
*index_positions_ptr = index_if_plain++;
}
else
{
if (is_plain_numbering && *is_plain_numbering)
throw Exception(
"Cannot switch from automatic field numbering to manual field specification", ErrorCodes::BAD_ARGUMENTS);
is_plain_numbering = false;
UInt64 arg;
parseNumber(pattern, last_open, i, arg);
if (arg >= argument_number)
throw Exception(
"Argument is too big for formatting. Note that indexing starts from zero", ErrorCodes::BAD_ARGUMENTS);
*index_positions_ptr = arg;
}
/// Constant string.
if (!data[*index_positions_ptr])
{
/// The next string should be glued to last `A {} C`.format('B') -> `A B C`.
glue_to_next = true;
substrings.back() += constant_strings[*index_positions_ptr];
}
else
++index_positions_ptr; /// Otherwise we commit arg number and proceed.
start_pos = i + 1;
}
}
if (is_open_curly)
throw Exception("Last open curly brace is not closed", ErrorCodes::BAD_ARGUMENTS);
String to_add = String(pattern.data() + start_pos, pattern.size() - start_pos);
double_brace_removal(to_add);
if (!glue_to_next)
substrings.emplace_back(to_add);
else
substrings.back() += to_add;
}
/// data for ColumnString and ColumnFixed. Nullptr means no data, it is const string.
/// offsets for ColumnString, nullptr is an indicator that there is a fixed string rather than ColumnString.
/// fixed_string_N for savings N to fixed strings.
/// constant_strings for constant strings. If data[i] is nullptr, than it is constant string.
/// constant_strings for constant strings. If data[i] is nullptr, it is constant string.
/// res_data is result_data, res_offsets is offset result.
/// input_rows_count is the number of rows processed.
/// Precondition: data.size() == offsets.size() == fixed_string_N.size() == constant_strings.size().
@ -207,29 +48,22 @@ struct FormatImpl
const std::vector<const ColumnString::Chars *> & data,
const std::vector<const ColumnString::Offsets *> & offsets,
[[maybe_unused]] /* Because sometimes !has_column_fixed_string */ const std::vector<size_t> & fixed_string_N,
const std::vector<String> & constant_strings,
const std::vector<std::optional<String>> & constant_strings,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets,
size_t input_rows_count)
{
const size_t argument_number = offsets.size();
UInt64 small_index_positions_buffer[small_argument_threshold];
/// The subsequent indexes of strings we should use. e.g `Hello world {1} {3} {1} {0}` this array will be filled with [1, 3, 1, 0, ... (garbage)] but without constant string indices.
UInt64 * index_positions = small_index_positions_buffer;
std::unique_ptr<UInt64[]> big_index_positions_buffer;
if (argument_number > small_argument_threshold)
{
big_index_positions_buffer.reset(new UInt64[argument_number]);
index_positions = big_index_positions_buffer.get();
}
/// The subsequent indexes of strings we should use. e.g `Hello world {1} {3} {1} {0}` this
/// array will be filled with [1, 3, 1, 0] but without constant string indices.
Format::IndexPositions index_positions;
/// Vector of substrings of pattern that will be copied to the answer, not string view because of escaping and iterators invalidation.
/// These are exactly what is between {} tokens, for `Hello {} world {}` we will have [`Hello `, ` world `, ``].
std::vector<String> substrings;
init(pattern, data, argument_number, constant_strings, index_positions, substrings);
Format::init(pattern, argument_number, constant_strings, index_positions, substrings);
UInt64 final_size = 0;
@ -271,7 +105,7 @@ struct FormatImpl
for (size_t j = 1; j < substrings.size(); ++j)
{
UInt64 arg = index_positions[j - 1];
auto offset_ptr = offsets[arg];
const auto * offset_ptr = offsets[arg];
UInt64 arg_offset = 0;
UInt64 size = 0;

View File

@ -80,6 +80,7 @@ void registerFunctionInitialQueryID(FunctionFactory & factory);
void registerFunctionServerUUID(FunctionFactory &);
void registerFunctionZooKeeperSessionUptime(FunctionFactory &);
void registerFunctionGetOSKernelVersion(FunctionFactory &);
void registerFunctionFlattenTuple(FunctionFactory &);
#if USE_ICU
void registerFunctionConvertCharset(FunctionFactory &);
@ -166,6 +167,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionServerUUID(factory);
registerFunctionZooKeeperSessionUptime(factory);
registerFunctionGetOSKernelVersion(factory);
registerFunctionFlattenTuple(factory);
#if USE_ICU
registerFunctionConvertCharset(factory);

View File

@ -237,7 +237,7 @@ void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
while (!emergency_stop && !read_worker->cancel)
{
if (!read_worker->reader->next())
throw Exception("Failed to read all the data from the reader", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to read all the data from the reader, missing {} bytes", read_worker->bytes_left);
if (emergency_stop || read_worker->cancel)
break;

View File

@ -82,8 +82,8 @@ public:
std::unique_ptr<ReadBufferFactory> reader_factory_,
ThreadPool * pool,
size_t max_working_readers,
WorkerSetup worker_setup = {},
WorkerCleanup worker_cleanup = {});
WorkerSetup worker_setup = [](ThreadStatus &){},
WorkerCleanup worker_cleanup = [](ThreadStatus &){});
~ParallelReadBuffer() override { finishAndWait(); }

View File

@ -1,4 +1,5 @@
#include <Common/config.h>
#include "IO/S3Common.h"
#if USE_AWS_S3
@ -42,6 +43,7 @@ ReadBufferFromS3::ReadBufferFromS3(
UInt64 max_single_read_retries_,
const ReadSettings & settings_,
bool use_external_buffer_,
size_t offset_,
size_t read_until_position_,
bool restricted_seek_)
: SeekableReadBufferWithSize(nullptr, 0)
@ -49,9 +51,10 @@ ReadBufferFromS3::ReadBufferFromS3(
, bucket(bucket_)
, key(key_)
, max_single_read_retries(max_single_read_retries_)
, offset(offset_)
, read_until_position(read_until_position_)
, read_settings(settings_)
, use_external_buffer(use_external_buffer_)
, read_until_position(read_until_position_)
, restricted_seek(restricted_seek_)
{
}
@ -210,13 +213,14 @@ std::optional<size_t> ReadBufferFromS3::getTotalSize()
if (file_size)
return file_size;
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(bucket);
request.SetKey(key);
auto object_size = S3::getObjectSize(client_ptr, bucket, key, false);
auto outcome = client_ptr->HeadObject(request);
auto head_result = outcome.GetResultWithOwnership();
file_size = head_result.GetContentLength();
if (!object_size)
{
return std::nullopt;
}
file_size = object_size;
return file_size;
}
@ -234,6 +238,11 @@ void ReadBufferFromS3::setReadUntilPosition(size_t position)
}
}
SeekableReadBuffer::Range ReadBufferFromS3::getRemainingReadRange() const
{
return Range{.left = static_cast<size_t>(offset), .right = read_until_position ? std::optional{read_until_position - 1} : std::nullopt};
}
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
{
Aws::S3::Model::GetObjectRequest req;
@ -272,6 +281,36 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
SeekableReadBufferPtr ReadBufferS3Factory::getReader()
{
const auto next_range = range_generator.nextRange();
if (!next_range)
{
return nullptr;
}
auto reader = std::make_shared<ReadBufferFromS3>(
client_ptr,
bucket,
key,
s3_max_single_read_retries,
read_settings,
false /*use_external_buffer*/,
next_range->first,
next_range->second);
return reader;
}
off_t ReadBufferS3Factory::seek(off_t off, [[maybe_unused]] int whence)
{
range_generator = RangeGenerator{object_size, range_step, static_cast<size_t>(off)};
return off;
}
std::optional<size_t> ReadBufferS3Factory::getTotalSize()
{
return object_size;
}
}
#endif

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/RangeGenerator.h>
#include <Common/config.h>
#if USE_AWS_S3
@ -7,6 +8,7 @@
#include <memory>
#include <IO/HTTPCommon.h>
#include <IO/ParallelReadBuffer.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadSettings.h>
#include <IO/SeekableReadBuffer.h>
@ -30,7 +32,9 @@ private:
String bucket;
String key;
UInt64 max_single_read_retries;
off_t offset = 0;
off_t read_until_position = 0;
Aws::S3::Model::GetObjectResult read_result;
std::unique_ptr<ReadBuffer> impl;
@ -45,6 +49,7 @@ public:
UInt64 max_single_read_retries_,
const ReadSettings & settings_,
bool use_external_buffer = false,
size_t offset_ = 0,
size_t read_until_position_ = 0,
bool restricted_seek_ = false);
@ -58,7 +63,7 @@ public:
void setReadUntilPosition(size_t position) override;
Range getRemainingReadRange() const override { return Range{ .left = static_cast<size_t>(offset), .right = read_until_position }; }
Range getRemainingReadRange() const override;
size_t getFileOffsetOfBufferEnd() const override { return offset; }
@ -69,13 +74,55 @@ private:
bool use_external_buffer;
off_t read_until_position = 0;
/// There is different seek policy for disk seek and for non-disk seek
/// (non-disk seek is applied for seekable input formats: orc, arrow, parquet).
bool restricted_seek;
};
/// Creates separate ReadBufferFromS3 for sequence of ranges of particular object
class ReadBufferS3Factory : public ParallelReadBuffer::ReadBufferFactory
{
public:
explicit ReadBufferS3Factory(
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
size_t range_step_,
size_t object_size_,
UInt64 s3_max_single_read_retries_,
const ReadSettings & read_settings_)
: client_ptr(client_ptr_)
, bucket(bucket_)
, key(key_)
, read_settings(read_settings_)
, range_generator(object_size_, range_step_)
, range_step(range_step_)
, object_size(object_size_)
, s3_max_single_read_retries(s3_max_single_read_retries_)
{
assert(range_step > 0);
assert(range_step < object_size);
}
SeekableReadBufferPtr getReader() override;
off_t seek(off_t off, [[maybe_unused]] int whence) override;
std::optional<size_t> getTotalSize() override;
private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
const String bucket;
const String key;
ReadSettings read_settings;
RangeGenerator range_generator;
size_t range_step;
size_t object_size;
UInt64 s3_max_single_read_retries;
};
}
#endif

View File

@ -1,6 +1,7 @@
#pragma once
#include <functional>
#include <Common/RangeGenerator.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/HTTPCommon.h>
#include <IO/ParallelReadBuffer.h>
@ -635,43 +636,6 @@ public:
void buildNewSession(const Poco::URI & uri) override { session = makeHTTPSession(uri, timeouts); }
};
class RangeGenerator
{
public:
explicit RangeGenerator(size_t total_size_, size_t range_step_, size_t range_start = 0)
: from(range_start), range_step(range_step_), total_size(total_size_)
{
}
size_t totalRanges() const { return static_cast<size_t>(round(static_cast<float>(total_size - from) / range_step)); }
using Range = std::pair<size_t, size_t>;
// return upper exclusive range of values, i.e. [from_range, to_range>
std::optional<Range> nextRange()
{
if (from >= total_size)
{
return std::nullopt;
}
auto to = from + range_step;
if (to >= total_size)
{
to = total_size;
}
Range range{from, to};
from = to;
return range;
}
private:
size_t from;
size_t range_step;
size_t total_size;
};
class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>>
{
using Parent = detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>>;

View File

@ -24,6 +24,7 @@
# include <aws/core/utils/UUID.h>
# include <aws/core/http/HttpClientFactory.h>
# include <aws/s3/S3Client.h>
# include <aws/s3/model/HeadObjectRequest.h> // Y_IGNORE
# include <IO/S3/PocoHTTPClientFactory.h>
# include <IO/S3/PocoHTTPClient.h>
@ -682,6 +683,7 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int S3_ERROR;
}
namespace S3
@ -839,6 +841,26 @@ namespace S3
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bucket name length is out of bounds in virtual hosted style S3 URI: {}{}",
quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : "");
}
size_t getObjectSize(std::shared_ptr<Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, bool throw_on_error)
{
Aws::S3::Model::HeadObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
Aws::S3::Model::HeadObjectOutcome outcome = client_ptr->HeadObject(req);
if (outcome.IsSuccess())
{
auto read_result = outcome.GetResultWithOwnership();
return static_cast<size_t>(read_result.GetContentLength());
}
else if (throw_on_error)
{
throw DB::Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
return 0;
}
}
}

View File

@ -75,6 +75,8 @@ struct URI
static void validateBucket(const String & bucket, const Poco::URI & uri);
};
size_t getObjectSize(std::shared_ptr<Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, bool throw_on_error = true);
}
#endif

View File

@ -169,6 +169,7 @@ public:
if (columns.size() != float_features_count + cat_features_count)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Number of columns is different with number of features: columns size {} float features size {} + cat features size {}",
columns.size(),
float_features_count,
cat_features_count);

View File

@ -233,7 +233,7 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
{
assert(!db_and_table.first && !db_and_table.second);
if (exception)
exception->emplace(ErrorCodes::UNKNOWN_TABLE, "Table {} doesn't exist", table_id.getNameForLogs());
exception->emplace(fmt::format("Table {} doesn't exist", table_id.getNameForLogs()), ErrorCodes::UNKNOWN_TABLE);
return {};
}
@ -263,7 +263,7 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
/// If table_id has no UUID, then the name of database was specified by user and table_id was not resolved through context.
/// Do not allow access to TEMPORARY_DATABASE because it contains all temporary tables of all contexts and users.
if (exception)
exception->emplace(ErrorCodes::DATABASE_ACCESS_DENIED, "Direct access to `{}` database is not allowed", String(TEMPORARY_DATABASE));
exception->emplace(fmt::format("Direct access to `{}` database is not allowed", TEMPORARY_DATABASE), ErrorCodes::DATABASE_ACCESS_DENIED);
return {};
}
@ -274,7 +274,7 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
if (databases.end() == it)
{
if (exception)
exception->emplace(ErrorCodes::UNKNOWN_DATABASE, "Database {} doesn't exist", backQuoteIfNeed(table_id.getDatabaseName()));
exception->emplace(fmt::format("Database {} doesn't exist", backQuoteIfNeed(table_id.getDatabaseName())), ErrorCodes::UNKNOWN_DATABASE);
return {};
}
database = it->second;
@ -282,7 +282,7 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
auto table = database->tryGetTable(table_id.table_name, context_);
if (!table && exception)
exception->emplace(ErrorCodes::UNKNOWN_TABLE, "Table {} doesn't exist", table_id.getNameForLogs());
exception->emplace(fmt::format("Table {} doesn't exist", table_id.getNameForLogs()), ErrorCodes::UNKNOWN_TABLE);
if (!table)
database = nullptr;

View File

@ -358,6 +358,7 @@ BlockIO InterpreterInsertQuery::execute()
auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings);
new_context->setInsertionTable(getContext()->getInsertionTable());
InterpreterSelectWithUnionQuery interpreter_select{
query.select, new_context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};

View File

@ -320,12 +320,13 @@ Chunk DDLQueryStatusSource::generate()
if (throw_on_timeout)
{
if (!first_exception)
first_exception = std::make_unique<Exception>(ErrorCodes::TIMEOUT_EXCEEDED, msg_format,
node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
first_exception = std::make_unique<Exception>(
fmt::format(msg_format, node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts),
ErrorCodes::TIMEOUT_EXCEEDED);
return {};
}
LOG_INFO(log, fmt::runtime(msg_format), node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
LOG_INFO(log, msg_format, node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
NameSet unfinished_hosts = waiting_hosts;
for (const auto & host_id : finished_hosts)
@ -358,9 +359,12 @@ Chunk DDLQueryStatusSource::generate()
/// Paradoxically, this exception will be throw even in case of "never_throw" mode.
if (!first_exception)
first_exception = std::make_unique<Exception>(ErrorCodes::UNFINISHED,
"Cannot provide query execution status. The query's node {} has been deleted by the cleaner"
" since it was finished (or its lifetime is expired)", node_path);
first_exception = std::make_unique<Exception>(
fmt::format(
"Cannot provide query execution status. The query's node {} has been deleted by the cleaner"
" since it was finished (or its lifetime is expired)",
node_path),
ErrorCodes::UNFINISHED);
return {};
}
@ -386,7 +390,8 @@ Chunk DDLQueryStatusSource::generate()
if (status.code != 0 && !first_exception
&& context->getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW)
{
first_exception = std::make_unique<Exception>(status.code, "There was an error on [{}:{}]: {}", host, port, status.message);
first_exception = std::make_unique<Exception>(
fmt::format("There was an error on [{}:{}]: {}", host, port, status.message), status.code);
}
++num_hosts_finished;

View File

@ -657,6 +657,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
}
if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
{
/// Save insertion table (not table function). TODO: support remote() table function.
auto table_id = insert_interpreter->getDatabaseTable();
if (!table_id.empty())
context->setInsertionTable(std::move(table_id));
}
{
std::unique_ptr<OpenTelemetrySpanHolder> span;
if (context->query_trace_context.trace_id != UUID())
@ -667,14 +675,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
res = interpreter->execute();
}
if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
{
/// Save insertion table (not table function). TODO: support remote() table function.
auto table_id = insert_interpreter->getDatabaseTable();
if (!table_id.empty())
context->setInsertionTable(std::move(table_id));
}
}
if (process_list_entry)

View File

@ -18,6 +18,10 @@ public:
virtual NamesAndTypesList readSchema() = 0;
/// True if order of columns is important in format.
/// Exceptions: JSON, TSKV.
virtual bool hasStrictOrderOfColumns() const { return true; }
virtual ~ISchemaReader() = default;
protected:
@ -60,6 +64,7 @@ class IRowWithNamesSchemaReader : public ISchemaReader
public:
IRowWithNamesSchemaReader(ReadBuffer & in_, size_t max_rows_to_read_, DataTypePtr default_type_ = nullptr);
NamesAndTypesList readSchema() override;
bool hasStrictOrderOfColumns() const override { return false; }
protected:
/// Read one row and determine types of columns in it.

View File

@ -359,7 +359,7 @@ bool MsgPackVisitor::visit_ext(const char * value, uint32_t size)
return true;
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported MsgPack extension type: {%x}", type);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported MsgPack extension type: {:x}", type);
}
void MsgPackVisitor::parse_error(size_t, size_t) // NOLINT
@ -498,7 +498,7 @@ DataTypePtr MsgPackSchemaReader::getDataType(const msgpack::object & object)
msgpack::object_ext object_ext = object.via.ext;
if (object_ext.type() == int8_t(MsgPackExtensionTypes::UUIDType))
return std::make_shared<DataTypeUUID>();
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Msgpack extension type {%x} is not supported", object_ext.type());
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Msgpack extension type {:x} is not supported", object_ext.type());
}
}
__builtin_unreachable();

View File

@ -45,7 +45,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static MergeTreeReaderSettings getMergeTreeReaderSettings(const ContextPtr & context)
static MergeTreeReaderSettings getMergeTreeReaderSettings(
const ContextPtr & context, const SelectQueryInfo & query_info)
{
const auto & settings = context->getSettingsRef();
return
@ -53,6 +54,7 @@ static MergeTreeReaderSettings getMergeTreeReaderSettings(const ContextPtr & con
.read_settings = context->getReadSettings(),
.save_marks_in_cache = true,
.checksum_on_read = settings.checksum_on_read,
.read_in_order = query_info.input_order_info != nullptr,
};
}
@ -82,7 +84,7 @@ ReadFromMergeTree::ReadFromMergeTree(
getPrewhereInfo(query_info_),
data_.getPartitionValueType(),
virt_column_names_)})
, reader_settings(getMergeTreeReaderSettings(context_))
, reader_settings(getMergeTreeReaderSettings(context_, query_info_))
, prepared_parts(std::move(parts_))
, real_column_names(std::move(real_column_names_))
, virt_column_names(std::move(virt_column_names_))
@ -206,6 +208,7 @@ ProcessorPtr ReadFromMergeTree::createSource(
.colums_to_read = required_columns
};
}
return std::make_shared<TSource>(
data, storage_snapshot, part.data_part, max_block_size, preferred_block_size_bytes,
preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache, prewhere_info,
@ -921,7 +924,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
total_marks_pk += part->index_granularity.getMarksCountWithoutFinal();
parts_before_pk = parts.size();
auto reader_settings = getMergeTreeReaderSettings(context);
auto reader_settings = getMergeTreeReaderSettings(context, query_info);
bool use_skip_indexes = settings.use_skip_indexes;
if (select.final() && !settings.use_skip_indexes_if_final)

View File

@ -1,10 +1,13 @@
// Needs to go first because its partial specialization of fmt::formatter
// should be defined before any instantiation
#include <fmt/ostream.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <base/logger_useful.h>
#include <cppkafka/cppkafka.h>
#include <boost/algorithm/string/join.hpp>
#include <fmt/ostream.h>
#include <algorithm>
namespace DB

View File

@ -575,9 +575,10 @@ size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
return checksum->second.file_size;
}
String IMergeTreeDataPart::getColumnNameWithMinimumCompressedSize(const StorageMetadataPtr & metadata_snapshot) const
String IMergeTreeDataPart::getColumnNameWithMinimumCompressedSize(const StorageSnapshotPtr & storage_snapshot) const
{
const auto & storage_columns = metadata_snapshot->getColumns().getAllPhysical();
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects().withSubcolumns();
auto storage_columns = storage_snapshot->getColumns(options);
MergeTreeData::AlterConversions alter_conversions;
if (!parent_part)
alter_conversions = storage.getAlterConversionsForPart(shared_from_this());

View File

@ -168,7 +168,7 @@ public:
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinimumCompressedSize(const StorageMetadataPtr & metadata_snapshot) const;
String getColumnNameWithMinimumCompressedSize(const StorageSnapshotPtr & storage_snapshot) const;
bool contains(const IMergeTreeDataPart & other) const { return info.contains(other.info); }

View File

@ -24,7 +24,7 @@ namespace
/// least one existing (physical) column in part.
bool injectRequiredColumnsRecursively(
const String & column_name,
const ColumnsDescription & storage_columns,
const StorageSnapshotPtr & storage_snapshot,
const MergeTreeData::AlterConversions & alter_conversions,
const MergeTreeData::DataPartPtr & part,
Names & columns,
@ -36,7 +36,8 @@ bool injectRequiredColumnsRecursively(
/// stages.
checkStackSize();
auto column_in_storage = storage_columns.tryGetColumnOrSubcolumn(GetColumnsOptions::AllPhysical, column_name);
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withSubcolumns().withExtendedObjects();
auto column_in_storage = storage_snapshot->tryGetColumn(options, column_name);
if (column_in_storage)
{
auto column_name_in_part = column_in_storage->getNameInStorage();
@ -63,7 +64,8 @@ bool injectRequiredColumnsRecursively(
/// Column doesn't have default value and don't exist in part
/// don't need to add to required set.
const auto column_default = storage_columns.getDefault(column_name);
auto metadata_snapshot = storage_snapshot->getMetadataForQuery();
const auto column_default = metadata_snapshot->getColumns().getDefault(column_name);
if (!column_default)
return false;
@ -73,39 +75,36 @@ bool injectRequiredColumnsRecursively(
bool result = false;
for (const auto & identifier : identifiers)
result |= injectRequiredColumnsRecursively(identifier, storage_columns, alter_conversions, part, columns, required_columns, injected_columns);
result |= injectRequiredColumnsRecursively(identifier, storage_snapshot, alter_conversions, part, columns, required_columns, injected_columns);
return result;
}
}
NameSet injectRequiredColumns(const MergeTreeData & storage, const StorageMetadataPtr & metadata_snapshot, const MergeTreeData::DataPartPtr & part, Names & columns)
NameSet injectRequiredColumns(
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,
const MergeTreeData::DataPartPtr & part,
Names & columns)
{
NameSet required_columns{std::begin(columns), std::end(columns)};
NameSet injected_columns;
bool have_at_least_one_physical_column = false;
const auto & storage_columns = metadata_snapshot->getColumns();
MergeTreeData::AlterConversions alter_conversions;
if (!part->isProjectionPart())
alter_conversions = storage.getAlterConversionsForPart(part);
for (size_t i = 0; i < columns.size(); ++i)
{
auto name_in_storage = Nested::extractTableName(columns[i]);
if (storage_columns.has(name_in_storage) && isObject(storage_columns.get(name_in_storage).type))
{
have_at_least_one_physical_column = true;
continue;
}
/// We are going to fetch only physical columns
if (!storage_columns.hasColumnOrSubcolumn(GetColumnsOptions::AllPhysical, columns[i]))
throw Exception("There is no physical column or subcolumn " + columns[i] + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withSubcolumns().withExtendedObjects();
if (!storage_snapshot->tryGetColumn(options, columns[i]))
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "There is no physical column or subcolumn {} in table", columns[i]);
have_at_least_one_physical_column |= injectRequiredColumnsRecursively(
columns[i], storage_columns, alter_conversions,
columns[i], storage_snapshot, alter_conversions,
part, columns, required_columns, injected_columns);
}
@ -115,7 +114,7 @@ NameSet injectRequiredColumns(const MergeTreeData & storage, const StorageMetada
*/
if (!have_at_least_one_physical_column)
{
const auto minimum_size_column_name = part->getColumnNameWithMinimumCompressedSize(metadata_snapshot);
const auto minimum_size_column_name = part->getColumnNameWithMinimumCompressedSize(storage_snapshot);
columns.push_back(minimum_size_column_name);
/// correctly report added column
injected_columns.insert(columns.back());
@ -271,7 +270,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
Names pre_column_names;
/// inject columns required for defaults evaluation
bool should_reorder = !injectRequiredColumns(storage, storage_snapshot->getMetadataForQuery(), data_part, column_names).empty();
bool should_reorder = !injectRequiredColumns(storage, storage_snapshot, data_part, column_names).empty();
if (prewhere_info)
{
@ -296,7 +295,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
if (pre_column_names.empty())
pre_column_names.push_back(column_names[0]);
const auto injected_pre_columns = injectRequiredColumns(storage, storage_snapshot->getMetadataForQuery(), data_part, pre_column_names);
const auto injected_pre_columns = injectRequiredColumns(storage, storage_snapshot, data_part, pre_column_names);
if (!injected_pre_columns.empty())
should_reorder = true;

View File

@ -22,7 +22,7 @@ using MergeTreeBlockSizePredictorPtr = std::shared_ptr<MergeTreeBlockSizePredict
* so that you can calculate the DEFAULT expression for these columns.
* Adds them to the `columns`.
*/
NameSet injectRequiredColumns(const MergeTreeData & storage, const StorageMetadataPtr & metadata_snapshot, const MergeTreeData::DataPartPtr & part, Names & columns);
NameSet injectRequiredColumns(const MergeTreeData & storage, const StorageSnapshotPtr & storage_snapshot, const MergeTreeData::DataPartPtr & part, Names & columns);
/// A batch of work for MergeTreeThreadSelectBlockInputStream

View File

@ -877,12 +877,22 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
{
std::atomic<size_t> total_rows{0};
/// Do not check number of read rows if we have reading
/// in order of sorting key with limit.
/// In general case, when there exists WHERE clause
/// it's impossible to estimate number of rows precisely,
/// because we can stop reading at any time.
SizeLimits limits;
if (settings.read_overflow_mode == OverflowMode::THROW && settings.max_rows_to_read)
if (settings.read_overflow_mode == OverflowMode::THROW
&& settings.max_rows_to_read
&& !query_info.input_order_info)
limits = SizeLimits(settings.max_rows_to_read, 0, settings.read_overflow_mode);
SizeLimits leaf_limits;
if (settings.read_overflow_mode_leaf == OverflowMode::THROW && settings.max_rows_to_read_leaf)
if (settings.read_overflow_mode_leaf == OverflowMode::THROW
&& settings.max_rows_to_read_leaf
&& !query_info.input_order_info)
leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, 0, settings.read_overflow_mode_leaf);
auto mark_cache = context->getIndexMarkCache();

View File

@ -20,6 +20,8 @@ struct MergeTreeReaderSettings
bool save_marks_in_cache = false;
/// Validate checksums on reading (should be always enabled in production).
bool checksum_on_read = true;
/// True if we read in order of sorting key.
bool read_in_order = false;
};
struct MergeTreeWriterSettings

View File

@ -39,9 +39,12 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
{
/// Actually it means that parallel reading from replicas enabled
/// and we have to collaborate with initiator.
/// In this case we won't set approximate rows, because it will be accounted multiple times
if (!extension_.has_value())
/// In this case we won't set approximate rows, because it will be accounted multiple times.
/// Also do not count amount of read rows if we read in order of sorting key,
/// because we don't know actual amount of read rows in case when limit is set.
if (!extension_.has_value() && !reader_settings.read_in_order)
addTotalRowsApprox(total_rows);
ordered_names = header_without_virtual_columns.getNames();
}

View File

@ -41,7 +41,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
addTotalRowsApprox(data_part->rows_count);
/// Add columns because we don't want to read empty blocks
injectRequiredColumns(storage, storage_snapshot->metadata, data_part, columns_to_read);
injectRequiredColumns(storage, storage_snapshot, data_part, columns_to_read);
NamesAndTypesList columns_for_reader;
if (take_column_types_from_storage)
{

View File

@ -1,4 +1,6 @@
#include <Common/config.h>
#include "IO/ParallelReadBuffer.h"
#include "IO/IOThreadPool.h"
#include "Parsers/ASTCreateQuery.h"
#if USE_AWS_S3
@ -238,7 +240,8 @@ StorageS3Source::StorageS3Source(
String compression_hint_,
const std::shared_ptr<Aws::S3::S3Client> & client_,
const String & bucket_,
std::shared_ptr<IteratorWrapper> file_iterator_)
std::shared_ptr<IteratorWrapper> file_iterator_,
const size_t download_thread_num_)
: SourceWithProgress(getHeader(sample_block_, need_path, need_file))
, WithContext(context_)
, name(std::move(name_))
@ -254,6 +257,7 @@ StorageS3Source::StorageS3Source(
, with_file_column(need_file)
, with_path_column(need_path)
, file_iterator(file_iterator_)
, download_thread_num(download_thread_num_)
{
initialize();
}
@ -275,28 +279,79 @@ bool StorageS3Source::initialize()
file_path = fs::path(bucket) / current_key;
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(client, bucket, current_key, max_single_read_retries, getContext()->getReadSettings()),
chooseCompressionMethod(current_key, compression_hint));
read_buf = wrapReadBufferWithCompressionMethod(createS3ReadBuffer(current_key), chooseCompressionMethod(current_key, compression_hint));
auto input_format = getContext()->getInputFormat(format, *read_buf, sample_block, max_block_size, format_settings);
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
if (columns_desc.hasDefaults())
{
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AddingDefaultsTransform>(header, columns_desc, *input_format, getContext());
});
builder.addSimpleTransform(
[&](const Block & header)
{ return std::make_shared<AddingDefaultsTransform>(header, columns_desc, *input_format, getContext()); });
}
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
initialized = false;
return true;
}
std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & key)
{
const size_t object_size = DB::S3::getObjectSize(client, bucket, key, false);
auto download_buffer_size = getContext()->getSettings().max_download_buffer_size;
const bool use_parallel_download = download_buffer_size > 0 && download_thread_num > 1;
const bool object_too_small = object_size < download_thread_num * download_buffer_size;
if (!use_parallel_download || object_too_small)
{
LOG_TRACE(log, "Downloading object of size {} from S3 in single thread", object_size);
return std::make_unique<ReadBufferFromS3>(client, bucket, key, max_single_read_retries, getContext()->getReadSettings());
}
assert(object_size > 0);
if (download_buffer_size < DBMS_DEFAULT_BUFFER_SIZE)
{
LOG_WARNING(log, "Downloading buffer {} bytes too small, set at least {} bytes", download_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
download_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
}
auto factory = std::make_unique<ReadBufferS3Factory>(
client, bucket, key, download_buffer_size, object_size, max_single_read_retries, getContext()->getReadSettings());
LOG_TRACE(
log, "Downloading from S3 in {} threads. Object size: {}, Range size: {}.", download_thread_num, object_size, download_buffer_size);
ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()
? CurrentThread::get().getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
ContextPtr query_context = CurrentThread::isInitialized() ? CurrentThread::get().getQueryContext() : nullptr;
auto worker_cleanup = [has_running_group = running_group == nullptr](ThreadStatus & thread_status)
{
if (has_running_group)
thread_status.detachQuery(false);
};
auto worker_setup = [query_context = std::move(query_context),
running_group = std::move(running_group)](ThreadStatus & thread_status)
{
/// Save query context if any, because cache implementation needs it.
if (query_context)
thread_status.attachQueryContext(query_context);
/// To be able to pass ProfileEvents.
if (running_group)
thread_status.attachQuery(running_group);
};
return std::make_unique<ParallelReadBuffer>(
std::move(factory), &IOThreadPool::get(), download_thread_num, std::move(worker_setup), std::move(worker_cleanup));
}
String StorageS3Source::getName() const
{
return name;
@ -670,6 +725,7 @@ Pipe StorageS3::read(
block_for_format = storage_snapshot->metadata->getSampleBlock();
}
const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageS3Source>(
@ -686,7 +742,8 @@ Pipe StorageS3::read(
compression_method,
client_auth.client,
client_auth.uri.bucket,
iterator_wrapper));
iterator_wrapper,
max_download_threads));
}
auto pipe = Pipe::unitePipes(std::move(pipes));

View File

@ -74,7 +74,8 @@ public:
String compression_hint_,
const std::shared_ptr<Aws::S3::S3Client> & client_,
const String & bucket,
std::shared_ptr<IteratorWrapper> file_iterator_);
std::shared_ptr<IteratorWrapper> file_iterator_,
size_t download_thread_num);
String getName() const override;
@ -101,13 +102,17 @@ private:
std::unique_ptr<PullingPipelineExecutor> reader;
/// onCancel and generate can be called concurrently
std::mutex reader_mutex;
bool initialized = false;
bool with_file_column = false;
bool with_path_column = false;
std::shared_ptr<IteratorWrapper> file_iterator;
size_t download_thread_num = 1;
Poco::Logger * log = &Poco::Logger::get("StorageS3Source");
/// Recreate ReadBuffer and BlockInputStream for each file.
bool initialize();
std::unique_ptr<ReadBuffer> createS3ReadBuffer(const String & key);
};
/**

View File

@ -51,40 +51,42 @@ NamesAndTypesList StorageSnapshot::getColumns(const GetColumnsOptions & options)
NamesAndTypesList StorageSnapshot::getColumnsByNames(const GetColumnsOptions & options, const Names & names) const
{
NamesAndTypesList res;
const auto & columns = getMetadataForQuery()->getColumns();
for (const auto & name : names)
res.push_back(getColumn(options, name));
return res;
}
std::optional<NameAndTypePair> StorageSnapshot::tryGetColumn(const GetColumnsOptions & options, const String & column_name) const
{
const auto & columns = getMetadataForQuery()->getColumns();
auto column = columns.tryGetColumn(options, column_name);
if (column && (!isObject(column->type) || !options.with_extended_objects))
return column;
if (options.with_extended_objects)
{
auto column = columns.tryGetColumn(options, name);
if (column && !isObject(column->type))
{
res.emplace_back(std::move(*column));
continue;
}
if (options.with_extended_objects)
{
auto object_column = object_columns.tryGetColumn(options, name);
if (object_column)
{
res.emplace_back(std::move(*object_column));
continue;
}
}
if (options.with_virtuals)
{
auto it = virtual_columns.find(name);
if (it != virtual_columns.end())
{
res.emplace_back(name, it->second);
continue;
}
}
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "There is no column {} in table", name);
auto object_column = object_columns.tryGetColumn(options, column_name);
if (object_column)
return object_column;
}
return res;
if (options.with_virtuals)
{
auto it = virtual_columns.find(column_name);
if (it != virtual_columns.end())
return NameAndTypePair(column_name, it->second);
}
return {};
}
NameAndTypePair StorageSnapshot::getColumn(const GetColumnsOptions & options, const String & column_name) const
{
auto column = tryGetColumn(options, column_name);
if (!column)
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "There is no column {} in table", column_name);
return *column;
}
Block StorageSnapshot::getSampleBlockForColumns(const Names & column_names) const

View File

@ -61,6 +61,10 @@ struct StorageSnapshot
/// Get columns with types according to options only for requested names.
NamesAndTypesList getColumnsByNames(const GetColumnsOptions & options, const Names & names) const;
/// Get column with type according to options for requested name.
std::optional<NameAndTypePair> tryGetColumn(const GetColumnsOptions & options, const String & column_name) const;
NameAndTypePair getColumn(const GetColumnsOptions & options, const String & column_name) const;
/// Block with ordinary + materialized + aliases + virtuals + subcolumns.
Block getSampleBlockForColumns(const Names & column_names) const;

View File

@ -54,6 +54,7 @@ def get_packager_cmd(
build_version: str,
image_version: str,
ccache_path: str,
official: bool,
) -> str:
package_type = build_config["package_type"]
comp = build_config["compiler"]
@ -83,6 +84,9 @@ def get_packager_cmd(
if _can_export_binaries(build_config):
cmd += " --with-binaries=tests"
if official:
cmd += " --official"
return cmd
@ -254,9 +258,11 @@ def main():
logging.info("Got version from repo %s", version.string)
official_flag = pr_info.number == 0
version_type = "testing"
if "release" in pr_info.labels or "release-lts" in pr_info.labels:
version_type = "stable"
official_flag = True
update_version_local(REPO_COPY, version, version_type)
@ -290,7 +296,9 @@ def main():
version.string,
image_version,
ccache_path,
official=official_flag,
)
logging.info("Going to run packager with %s", packager_cmd)
build_clickhouse_log = os.path.join(TEMP_PATH, "build_log")

View File

@ -233,7 +233,11 @@ if __name__ == "__main__":
if ok_builds == 0 or some_builds_are_missing:
summary_status = "error"
description = f"{ok_builds}/{total_builds} builds are OK"
addition = ""
if some_builds_are_missing:
addition = "(some builds are missing)"
description = f"{ok_builds}/{total_builds} builds are OK {addition}"
print("::notice ::Report url: {}".format(url))
@ -244,3 +248,6 @@ if __name__ == "__main__":
state=summary_status,
target_url=url,
)
if summary_status == "error":
sys.exit(1)

View File

@ -20,8 +20,6 @@ class Description:
def __init__(self, pull_request):
self.label_name = str()
self.legal = False
self._parse(pull_request["bodyText"])
def _parse(self, text):
@ -39,12 +37,6 @@ class Description:
category = stripped
next_category = False
if (
stripped
== "I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en"
):
self.legal = True
category_headers = (
"Category (leave one):",
"Changelog category (leave one):",

View File

@ -10,13 +10,13 @@ from get_robot_token import get_parameter_from_ssm
class ClickHouseHelper:
def __init__(self, url=None):
if url is None:
self.url = get_parameter_from_ssm("clickhouse-test-stat-url2")
self.auth = {
"X-ClickHouse-User": get_parameter_from_ssm(
"clickhouse-test-stat-login2"
),
"X-ClickHouse-Key": "",
}
url = get_parameter_from_ssm("clickhouse-test-stat-url")
self.url = url
self.auth = {
"X-ClickHouse-User": get_parameter_from_ssm("clickhouse-test-stat-login"),
"X-ClickHouse-Key": get_parameter_from_ssm("clickhouse-test-stat-password"),
}
@staticmethod
def _insert_json_str_info_impl(url, auth, db, table, json_str):
@ -179,7 +179,7 @@ def mark_flaky_tests(clickhouse_helper, check_name, test_results):
check_name=check_name
)
tests_data = clickhouse_helper.select_json_each_row("gh-data", query)
tests_data = clickhouse_helper.select_json_each_row("default", query)
master_failed_tests = {row["test_name"] for row in tests_data}
logging.info("Found flaky tests: %s", ", ".join(master_failed_tests))

View File

@ -59,3 +59,17 @@ def post_commit_status_to_file(file_path, description, state, report_url):
with open(file_path, "w", encoding="utf-8") as f:
out = csv.writer(f, delimiter="\t")
out.writerow([state, report_url, description])
def remove_labels(gh, pr_info, labels_names):
repo = gh.get_repo(GITHUB_REPOSITORY)
pull_request = repo.get_pull(pr_info.number)
for label in labels_names:
pull_request.remove_from_labels(label)
def post_labels(gh, pr_info, labels_names):
repo = gh.get_repo(GITHUB_REPOSITORY)
pull_request = repo.get_pull(pr_info.number)
for label in labels_names:
pull_request.add_to_labels(label)

View File

@ -197,4 +197,8 @@ if __name__ == "__main__":
report_url,
CHECK_NAME,
)
ch_helper.insert_events_into(db="gh-data", table="checks", events=prepared_events)
ch_helper.insert_events_into(db="default", table="checks", events=prepared_events)
if state == "error":
sys.exit(1)

View File

@ -7,6 +7,7 @@ import platform
import shutil
import subprocess
import time
import sys
from typing import Dict, List, Optional, Set, Tuple, Union
from github import Github
@ -459,7 +460,10 @@ def main():
NAME,
)
ch_helper = ClickHouseHelper()
ch_helper.insert_events_into(db="gh-data", table="checks", events=prepared_events)
ch_helper.insert_events_into(db="default", table="checks", events=prepared_events)
if status == "error":
sys.exit(1)
if __name__ == "__main__":

View File

@ -234,7 +234,7 @@ def main():
NAME,
)
ch_helper = ClickHouseHelper()
ch_helper.insert_events_into(db="gh-data", table="checks", events=prepared_events)
ch_helper.insert_events_into(db="default", table="checks", events=prepared_events)
if __name__ == "__main__":

View File

@ -114,4 +114,7 @@ if __name__ == "__main__":
report_url,
NAME,
)
ch_helper.insert_events_into(db="gh-data", table="checks", events=prepared_events)
ch_helper.insert_events_into(db="default", table="checks", events=prepared_events)
if status == "error":
sys.exit(1)

View File

@ -204,11 +204,11 @@ if __name__ == "__main__":
report_url,
NAME,
)
ch_helper.insert_events_into(db="gh-data", table="checks", events=prepared_events)
ch_helper.insert_events_into(db="default", table="checks", events=prepared_events)
# Refuse other checks to run if fast test failed
if state != "success":
if "force-tests" in pr_info.labels:
if "force-tests" in pr_info.labels and state != "error":
print("'force-tests' enabled, will report success")
else:
sys.exit(1)

View File

@ -356,7 +356,7 @@ if __name__ == "__main__":
report_url,
check_name_with_group,
)
ch_helper.insert_events_into(db="gh-data", table="checks", events=prepared_events)
ch_helper.insert_events_into(db="default", table="checks", events=prepared_events)
if state != "success":
if "force-tests" in pr_info.labels:

View File

@ -279,4 +279,8 @@ if __name__ == "__main__":
report_url,
check_name_with_group,
)
ch_helper.insert_events_into(db="gh-data", table="checks", events=prepared_events)
ch_helper.insert_events_into(db="default", table="checks", events=prepared_events)
if state == "error":
sys.exit(1)

View File

@ -271,5 +271,5 @@ if __name__ == "__main__":
report_url,
CHECK_NAME,
)
ch_helper.insert_events_into(db="gh-data", table="checks", events=prepared_events)
ch_helper.insert_events_into(db="default", table="checks", events=prepared_events)
clear_autoscaling_group()

View File

@ -217,3 +217,6 @@ if __name__ == "__main__":
post_commit_status(
gh, pr_info.sha, check_name_with_group, message, status, report_url
)
if status == "error":
sys.exit(1)

View File

@ -236,6 +236,15 @@ class PRInfo:
return True
return False
def has_changes_in_submodules(self):
if self.changed_files is None or not self.changed_files:
return True
for f in self.changed_files:
if "contrib" in f:
return True
return False
def can_skip_builds_and_use_version_from_master(self):
# TODO: See a broken loop
if "force tests" in self.labels:

View File

@ -8,7 +8,7 @@ from github import Github
from env_helper import GITHUB_RUN_URL, GITHUB_REPOSITORY, GITHUB_SERVER_URL
from pr_info import PRInfo
from get_robot_token import get_best_robot_token
from commit_status_helper import get_commit
from commit_status_helper import get_commit, post_labels, remove_labels
NAME = "Run Check (actions)"
@ -22,6 +22,7 @@ OK_SKIP_LABELS = {"release", "pr-backport", "pr-cherrypick"}
CAN_BE_TESTED_LABEL = "can be tested"
DO_NOT_TEST_LABEL = "do not test"
FORCE_TESTS_LABEL = "force tests"
SUBMODULE_CHANGED_LABEL = "submodule changed"
# Individual trusted contirbutors who are not in any trusted organization.
# Can be changed in runtime: we will append users that we learned to be in
@ -81,6 +82,25 @@ TRUSTED_CONTRIBUTORS = {
]
}
MAP_CATEGORY_TO_LABEL = {
"New Feature": "pr-feature",
"Bug Fix": "pr-bugfix",
"Bug Fix (user-visible misbehaviour in official stable or prestable release)": "pr-bugfix",
"Improvement": "pr-improvement",
"Performance Improvement": "pr-performance",
"Backward Incompatible Change": "pr-backward-incompatible",
"Build/Testing/Packaging Improvement": "pr-build",
"Build Improvement": "pr-build",
"Build/Testing Improvement": "pr-build",
"Build": "pr-build",
"Packaging Improvement": "pr-build",
"Not for changelog (changelog entry is not required)": "pr-not-for-changelog",
"Not for changelog": "pr-not-for-changelog",
"Documentation (changelog entry is not required)": "pr-documentation",
"Documentation": "pr-documentation",
# 'Other': doesn't match anything
}
def pr_is_by_trusted_user(pr_user_login, pr_user_orgs):
if pr_user_login.lower() in TRUSTED_CONTRIBUTORS:
@ -168,7 +188,7 @@ def check_pr_description(pr_info):
+ second_category
+ "'"
)
return result_status[:140]
return result_status[:140], category
elif re.match(
r"(?i)^[>*_ ]*(short\s*description|change\s*log\s*entry)", lines[i]
@ -190,30 +210,57 @@ def check_pr_description(pr_info):
i += 1
if not category:
return "Changelog category is empty"
return "Changelog category is empty", category
# Filter out the PR categories that are not for changelog.
if re.match(
r"(?i)doc|((non|in|not|un)[-\s]*significant)|(not[ ]*for[ ]*changelog)",
category,
):
return ""
return "", category
if not entry:
return f"Changelog entry required for category '{category}'"
return f"Changelog entry required for category '{category}'", category
return ""
return "", category
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
pr_info = PRInfo(need_orgs=True, pr_event_from_api=True)
pr_info = PRInfo(need_orgs=True, pr_event_from_api=True, need_changed_files=True)
can_run, description, labels_state = should_run_checks_for_pr(pr_info)
gh = Github(get_best_robot_token())
commit = get_commit(gh, pr_info.sha)
description_report = check_pr_description(pr_info)[:139]
description_report, category = check_pr_description(pr_info)
pr_labels_to_add = []
pr_labels_to_remove = []
if (
category in MAP_CATEGORY_TO_LABEL
and MAP_CATEGORY_TO_LABEL[category] not in pr_info.labels
):
pr_labels_to_add.append(MAP_CATEGORY_TO_LABEL[category])
for label in pr_info.labels:
if (
label in MAP_CATEGORY_TO_LABEL.values()
and category in MAP_CATEGORY_TO_LABEL
and label != MAP_CATEGORY_TO_LABEL[category]
):
pr_labels_to_remove.append(label)
if pr_info.has_changes_in_submodules():
pr_labels_to_add.append(SUBMODULE_CHANGED_LABEL)
elif SUBMODULE_CHANGED_LABEL in pr_info.labels:
pr_labels_to_remove.append(SUBMODULE_CHANGED_LABEL)
if pr_labels_to_add:
post_labels(gh, pr_info, pr_labels_to_add)
if pr_labels_to_remove:
remove_labels(gh, pr_info, pr_labels_to_remove)
if description_report:
print("::notice ::Cannot run, description does not match the template")
logging.info(
@ -225,7 +272,7 @@ if __name__ == "__main__":
)
commit.create_status(
context=NAME,
description=description_report,
description=description_report[:139],
state="failure",
target_url=url,
)

View File

@ -147,4 +147,8 @@ if __name__ == "__main__":
report_url,
CHECK_NAME,
)
ch_helper.insert_events_into(db="gh-data", table="checks", events=prepared_events)
ch_helper.insert_events_into(db="default", table="checks", events=prepared_events)
if state == "error":
sys.exit(1)

View File

@ -176,4 +176,7 @@ if __name__ == "__main__":
report_url,
check_name,
)
ch_helper.insert_events_into(db="gh-data", table="checks", events=prepared_events)
ch_helper.insert_events_into(db="default", table="checks", events=prepared_events)
if state == "error":
sys.exit(1)

View File

@ -117,4 +117,7 @@ if __name__ == "__main__":
report_url,
NAME,
)
ch_helper.insert_events_into(db="gh-data", table="checks", events=prepared_events)
ch_helper.insert_events_into(db="default", table="checks", events=prepared_events)
if state == "error":
sys.exit(1)

View File

@ -173,4 +173,8 @@ if __name__ == "__main__":
report_url,
check_name,
)
ch_helper.insert_events_into(db="gh-data", table="checks", events=prepared_events)
ch_helper.insert_events_into(db="default", table="checks", events=prepared_events)
if state == "error":
sys.exit(1)

View File

@ -116,7 +116,7 @@ def clickhouse_execute_http(base_args, query, timeout=30, settings=None, default
def clickhouse_execute(base_args, query, timeout=30, settings=None):
return clickhouse_execute_http(base_args, query, timeout, settings).strip()
def clickhouse_execute_json(base_args, query, timeout=30, settings=None):
def clickhouse_execute_json(base_args, query, timeout=60, settings=None):
data = clickhouse_execute_http(base_args, query, timeout, settings, 'JSONEachRow')
if not data:
return None

View File

@ -280,4 +280,4 @@ def test_HDFS(start_cluster):
def test_schema_inference(start_cluster):
error = node7.query_and_get_error("desc url('http://test.com`, 'TSVRaw'')")
assert(error.find('ReadWriteBufferFromHTTPBase') == -1)
assert error.find("ReadWriteBufferFromHTTPBase") == -1

View File

@ -94,7 +94,7 @@ def _check_exception(exception, expected_tries=3):
@pytest.fixture(scope="module", params=["configs", "configs_secure"])
def started_cluster(request):
cluster = ClickHouseCluster(__file__)
cluster = ClickHouseCluster(__file__, request.param)
cluster.__with_ssl_config = request.param == "configs_secure"
main_configs = []
main_configs += [os.path.join(request.param, "config.d/remote_servers.xml")]

View File

@ -30,7 +30,7 @@ def test_overcommited_is_killed():
responses_A = list()
responses_B = list()
for _ in range(100):
for _ in range(500):
responses_A.append(node.get_query_request(TEST_QUERY_A, user="A"))
responses_B.append(node.get_query_request(TEST_QUERY_B, user="B"))

View File

@ -0,0 +1,25 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", stay_alive=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_materialized_view_with_subquery(start_cluster):
node.query("create table test (x UInt32) engine=TinyLog()")
node.query(
"create materialized view mv engine = TinyLog() as with subquery as (select * from test) select * from subquery"
)
node.restart_clickhouse(kill=True)
node.query("insert into test select 1")
result = node.query("select * from mv")
assert int(result) == 1

View File

@ -517,7 +517,7 @@ def test_put_get_with_globs(started_cluster):
# ("'minio','minio123',",True), Redirect with credentials not working with nginx.
],
)
def test_multipart_put(started_cluster, maybe_auth, positive):
def test_multipart(started_cluster, maybe_auth, positive):
# type: (ClickHouseCluster) -> None
bucket = (
@ -535,8 +535,9 @@ def test_multipart_put(started_cluster, maybe_auth, positive):
one_line_length = 6 # 3 digits, 2 commas, 1 line separator.
total_rows = csv_size_bytes // one_line_length
# Generate data having size more than one part
int_data = [[1, 2, 3] for i in range(csv_size_bytes // one_line_length)]
int_data = [[1, 2, 3] for i in range(total_rows)]
csv_data = "".join(["{},{},{}\n".format(x, y, z) for x, y, z in int_data])
assert len(csv_data) > min_part_size_bytes
@ -573,6 +574,37 @@ def test_multipart_put(started_cluster, maybe_auth, positive):
assert csv_data == get_s3_file_content(started_cluster, bucket, filename)
# select uploaded data from many threads
select_query = (
"select sum(column1), sum(column2), sum(column3) "
"from s3('http://{host}:{port}/{bucket}/{filename}', {auth}'CSV', '{table_format}')".format(
host=started_cluster.minio_redirect_host,
port=started_cluster.minio_redirect_port,
bucket=bucket,
filename=filename,
auth=maybe_auth,
table_format=table_format,
)
)
try:
select_result = run_query(
instance,
select_query,
settings={
"max_download_threads": random.randint(4, 16),
"max_download_buffer_size": 1024 * 1024,
},
)
except helpers.client.QueryRuntimeException:
if positive:
raise
else:
assert positive
assert (
select_result
== "\t".join(map(str, [total_rows, total_rows * 2, total_rows * 3])) + "\n"
)
def test_remote_host_filter(started_cluster):
instance = started_cluster.instances["restricted_dummy"]

View File

@ -21,3 +21,5 @@ $CLICKHOUSE_CLIENT -q "KILL QUERY WHERE 0 ASYNC"
$CLICKHOUSE_CLIENT -q "KILL QUERY WHERE 0 FORMAT TabSeparated"
$CLICKHOUSE_CLIENT -q "KILL QUERY WHERE 0 SYNC FORMAT TabSeparated"
$CLICKHOUSE_CLIENT -q "KILL QUERY WHERE 1 TEST" &>/dev/null
clickhouse_test_wait_queries 60

View File

@ -1,3 +1,5 @@
-- Tags: no-random-settings
drop table if exists tab_00484;
create table tab_00484 (date Date, x UInt64, s FixedString(128)) engine = MergeTree PARTITION BY date ORDER BY (date, x) SETTINGS min_bytes_for_wide_part = 0;
insert into tab_00484 select today(), number, toFixedString('', 128) from system.numbers limit 8192;

View File

@ -0,0 +1,4 @@
800000 2000000 1400000 900000
800000 2000000 1400000 900000
Tuple(col0 UInt64, col1 UInt64, col2 UInt64, col3 UInt64, col4 UInt64, col5 UInt64, col6 UInt64, col7 UInt64, col8 UInt64)
1600000 4000000 2800000 1800000

View File

@ -0,0 +1,41 @@
-- Tags: no-fasttest
DROP TABLE IF EXISTS t_json;
DROP TABLE IF EXISTS t_map;
SET allow_experimental_object_type = 1;
CREATE TABLE t_json(id UInt64, obj JSON) ENGINE = MergeTree ORDER BY id;
CREATE TABLE t_map(id UInt64, m Map(String, UInt64)) ENGINE = MergeTree ORDER BY id;
INSERT INTO t_map
SELECT
number,
(
arrayMap(x -> 'col' || toString(x), range(number % 10)),
range(number % 10)
)::Map(String, UInt64)
FROM numbers(1000000);
INSERT INTO t_json SELECT id, m FROM t_map;
SELECT sum(m['col1']), sum(m['col4']), sum(m['col7']), sum(m['col8'] = 0) FROM t_map;
SELECT sum(obj.col1), sum(obj.col4), sum(obj.col7), sum(obj.col8 = 0) FROM t_json;
SELECT toTypeName(obj) FROM t_json LIMIT 1;
INSERT INTO t_json
SELECT
number,
(
arrayMap(x -> 'col' || toString(x), range(number % 10)),
range(number % 10)
)::Map(FixedString(4), UInt64)
FROM numbers(1000000);
SELECT sum(obj.col1), sum(obj.col4), sum(obj.col7), sum(obj.col8 = 0) FROM t_json;
INSERT INTO t_json
SELECT number, (range(number % 10), range(number % 10))::Map(UInt64, UInt64)
FROM numbers(1000000); -- { serverError 53 }
DROP TABLE IF EXISTS t_json;
DROP TABLE IF EXISTS t_map;

View File

@ -0,0 +1,2 @@
Tuple(foo Int8, k1 Int8, k2 Int8)
1

View File

@ -0,0 +1,19 @@
-- Tags: no-fasttest
DROP TABLE IF EXISTS t_json;
SET allow_experimental_object_type = 1;
CREATE TABLE t_json(id UInt64, obj JSON)
ENGINE = MergeTree ORDER BY id
SETTINGS min_bytes_for_wide_part = 0;
SYSTEM STOP MERGES t_json;
INSERT INTO t_json SELECT number, '{"k1": 1, "k2": 2}' FROM numbers(1000000);
INSERT INTO t_json VALUES (1000001, '{"foo": 1}');
SELECT toTypeName(obj) FROM t_json LIMIT 1;
SELECT count() FROM t_json WHERE obj.foo != 0;
DROP TABLE IF EXISTS t_json;

View File

@ -0,0 +1,8 @@
{"id":"1","obj":{"k1":1,"k2":{"k3":"2","k4":[{"k5":3,"k6":0},{"k5":4,"k6":0}]},"some":0},"s":"foo"}
{"id":"2","obj":{"k1":0,"k2":{"k3":"str","k4":[{"k5":0,"k6":55}]},"some":42},"s":"bar"}
Tuple(k1 Int8, k2 Tuple(k3 String, k4 Nested(k5 Int8, k6 Int8)), some Int8)
{"id":"1","obj":"aaa","s":"foo"}
{"id":"2","obj":"bbb","s":"bar"}
{"map":{"k1":1,"k2":2},"obj":{"k1":1,"k2.k3":2},"map_type":"Map(String, Nullable(Float64))","obj_type":"Object('json')"}
{"obj":{"k1":1,"k2":2},"map":{"k1":"1","k2":"2"}}
Tuple(k1 Float64, k2 Float64)

View File

@ -0,0 +1,52 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_json_inference"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_inference (id UInt64, obj JSON, s String) \
ENGINE = MergeTree ORDER BY id" --allow_experimental_object_type 1
user_files_path=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
mkdir -p ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/
rm -rf ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME:?}/*
filename="${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/data.json"
echo '{"id": 1, "obj": {"k1": 1, "k2": {"k3": 2, "k4": [{"k5": 3}, {"k5": 4}]}}, "s": "foo"}' > $filename
echo '{"id": 2, "obj": {"k2": {"k3": "str", "k4": [{"k6": 55}]}, "some": 42}, "s": "bar"}' >> $filename
${CLICKHOUSE_CLIENT} -q "INSERT INTO t_json_inference SELECT * FROM file('${CLICKHOUSE_TEST_UNIQUE_NAME}/data.json', 'JSONEachRow')"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM t_json_inference FORMAT JSONEachRow" --output_format_json_named_tuples_as_objects 1
${CLICKHOUSE_CLIENT} -q "SELECT toTypeName(obj) FROM t_json_inference LIMIT 1"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_json_inference"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_inference (id UInt64, obj String, s String) ENGINE = MergeTree ORDER BY id"
echo '{"obj": "aaa", "id": 1, "s": "foo"}' > $filename
echo '{"id": 2, "obj": "bbb", "s": "bar"}' >> $filename
${CLICKHOUSE_CLIENT} -q "INSERT INTO t_json_inference SELECT * FROM file('${CLICKHOUSE_TEST_UNIQUE_NAME}/data.json', 'JSONEachRow')"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM t_json_inference FORMAT JSONEachRow" --output_format_json_named_tuples_as_objects 1
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_json_inference"
echo '{"map": {"k1": 1, "k2": 2}, "obj": {"k1": 1, "k2": {"k3": 2}}}' > $filename
${CLICKHOUSE_CLIENT} -q "SELECT map, obj, toTypeName(map) AS map_type, toTypeName(obj) AS obj_type \
FROM file('${CLICKHOUSE_TEST_UNIQUE_NAME}/data.json', 'JSONEachRow') FORMAT JSONEachRow" --output_format_json_named_tuples_as_objects 1
${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_inference (obj JSON, map Map(String, UInt64)) \
ENGINE = MergeTree ORDER BY tuple()" --allow_experimental_object_type 1
echo '{"map": {"k1": 1, "k2": 2}, "obj": {"k1": 1, "k2": 2}}' > $filename
${CLICKHOUSE_CLIENT} -q "INSERT INTO t_json_inference SELECT * FROM file('${CLICKHOUSE_TEST_UNIQUE_NAME}/data.json', 'JSONEachRow')"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM t_json_inference FORMAT JSONEachRow" --output_format_json_named_tuples_as_objects 1
${CLICKHOUSE_CLIENT} -q "SELECT toTypeName(obj) FROM t_json_inference LIMIT 1"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_json_inference"

View File

@ -23,7 +23,7 @@ function run_and_check()
echo "Checking $*"
# Run query with external table (implicit StorageMemory user)
$CLICKHOUSE_CURL -sS -F "s=@$tmp_file;" "$CLICKHOUSE_URL&s_structure=key+Int&query=SELECT+count()+FROM+s&memory_profiler_sample_probability=1&query_id=$query_id&$*" -o /dev/null
$CLICKHOUSE_CURL -sS -F "s=@$tmp_file;" "$CLICKHOUSE_URL&s_structure=key+Int&query=SELECT+count()+FROM+s&memory_profiler_sample_probability=1&max_untracked_memory=0&query_id=$query_id&$*" -o /dev/null
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}" --data-binary @- <<<'SYSTEM FLUSH LOGS'

View File

@ -0,0 +1,6 @@
10
0
1
2
3
4

View File

@ -0,0 +1,22 @@
DROP TABLE IF EXISTS t_max_rows_to_read;
CREATE TABLE t_max_rows_to_read (a UInt64)
ENGINE = MergeTree ORDER BY a
SETTINGS index_granularity = 4;
INSERT INTO t_max_rows_to_read SELECT number FROM numbers(100);
SET max_threads = 1;
SELECT a FROM t_max_rows_to_read WHERE a = 10 SETTINGS max_rows_to_read = 4;
SELECT a FROM t_max_rows_to_read ORDER BY a LIMIT 5 SETTINGS max_rows_to_read = 12;
-- This should work, but actually it doesn't. Need to investigate.
-- SELECT a FROM t_max_rows_to_read WHERE a > 10 ORDER BY a LIMIT 5 SETTINGS max_rows_to_read = 20;
SELECT a FROM t_max_rows_to_read ORDER BY a LIMIT 20 FORMAT Null SETTINGS max_rows_to_read = 12; -- { serverError 158 }
SELECT a FROM t_max_rows_to_read WHERE a > 10 ORDER BY a LIMIT 5 FORMAT Null SETTINGS max_rows_to_read = 12; -- { serverError 158 }
SELECT a FROM t_max_rows_to_read WHERE a = 10 OR a = 20 FORMAT Null SETTINGS max_rows_to_read = 4; -- { serverError 158 }
DROP TABLE t_max_rows_to_read;

View File

@ -0,0 +1 @@
00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000

File diff suppressed because one or more lines are too long

View File

@ -16,7 +16,7 @@ ${CLICKHOUSE_CLIENT} -q "CREATE ROLE r02246"
${CLICKHOUSE_CLIENT} -q "CREATE USER u02246"
${CLICKHOUSE_CLIENT} -q "GRANT INSERT ON async_inserts_02246 TO r02246"
${CLICKHOUSE_CLIENT} -q "GRANT r02246 to u02246"
${CLICKHOUSE_CLIENT} -q "CREATE QUOTA q02246 FOR INTERVAL 1 HOUR MAX QUERY INSERTS = 2 TO r02246"
${CLICKHOUSE_CLIENT} -q "CREATE QUOTA q02246 FOR INTERVAL 100 YEAR MAX QUERY INSERTS = 2 TO r02246"
${CLICKHOUSE_CLIENT} --user u02246 --async_insert 1 -q "INSERT INTO async_inserts_02246 VALUES (1, 'a')"
${CLICKHOUSE_CLIENT} --user u02246 --async_insert 1 -q "INSERT INTO async_inserts_02246 VALUES (2, 'b')"

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
dir=${CLICKHOUSE_TEST_UNIQUE_NAME}
[[ -d $dir ]] && rm -r $dir
mkdir $dir
$CLICKHOUSE_LOCAL --multiline --multiquery --path $dir -q """
DROP DATABASE IF EXISTS test;
CREATE DATABASE IF NOT EXISTS test;
USE test;
CREATE TABLE test (id Int32) ENGINE=MergeTree() ORDER BY id;
DROP DATABASE test;
"""
$CLICKHOUSE_LOCAL --multiline --multiquery -q """
DROP DATABASE IF EXISTS test;
CREATE DATABASE IF NOT EXISTS test;
USE test;
CREATE TABLE test (id Int32) ENGINE=MergeTree() ORDER BY id;
DROP DATABASE test;
"""

View File

@ -0,0 +1,4 @@
([1,2],['a','b'],3,'c',4) Tuple(`t1.a` Array(UInt32), `t1.s` Array(String), b UInt32, `t2.k` String, `t2.v` UInt32)
Tuple(id Int8, obj Tuple(k1 Int8, k2 Tuple(k3 String, k4 Nested(k5 Int8, k6 Int8)), some Int8), s String) Tuple(id Int8, `obj.k1` Int8, `obj.k2.k3` String, `obj.k2.k4.k5` Array(Int8), `obj.k2.k4.k6` Array(Int8), `obj.some` Int8, s String)
1 1 2 [3,4] [0,0] 0 foo
2 0 str [0] [55] 42 bar

View File

@ -0,0 +1,24 @@
-- Tags: no-fasttest
DROP TABLE IF EXISTS t_flatten_tuple;
DROP TABLE IF EXISTS t_flatten_object;
SET flatten_nested = 0;
CREATE TABLE t_flatten_tuple(t Tuple(t1 Nested(a UInt32, s String), b UInt32, t2 Tuple(k String, v UInt32))) ENGINE = Memory;
INSERT INTO t_flatten_tuple VALUES (([(1, 'a'), (2, 'b')], 3, ('c', 4)));
SELECT flattenTuple(t) AS ft, toTypeName(ft) FROM t_flatten_tuple;
SET allow_experimental_object_type = 1;
CREATE TABLE t_flatten_object(data JSON) ENGINE = Memory;
INSERT INTO t_flatten_object VALUES ('{"id": 1, "obj": {"k1": 1, "k2": {"k3": 2, "k4": [{"k5": 3}, {"k5": 4}]}}, "s": "foo"}');
INSERT INTO t_flatten_object VALUES ('{"id": 2, "obj": {"k2": {"k3": "str", "k4": [{"k6": 55}]}, "some": 42}, "s": "bar"}');
SELECT toTypeName(data), toTypeName(flattenTuple(data)) FROM t_flatten_object LIMIT 1;
SELECT untuple(flattenTuple(data)) FROM t_flatten_object ORDER BY data.id;
DROP TABLE IF EXISTS t_flatten_tuple;
DROP TABLE IF EXISTS t_flatten_object;

View File

@ -1 +0,0 @@
# -*- coding: utf-8 -*-

Some files were not shown because too many files have changed in this diff Show More