Merge branch 'master' into max_cache_download_limit

This commit is contained in:
Han Shukai 2022-10-18 20:28:04 +08:00 committed by GitHub
commit acf13a7b60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 123 additions and 43 deletions

View File

@ -84,7 +84,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_factor, 2, "Multiply s3_min_upload_part_size by this factor each time s3_multiply_parts_count_threshold parts were uploaded from a single write to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 1000, "Each time this number of parts was uploaded to S3 s3_min_upload_part_size multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3 s3_min_upload_part_size multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during S3 write.", 0) \

View File

@ -22,11 +22,6 @@ namespace ErrorCodes
namespace
{
/// The regex-based code style check script in CI complains when it sees "ErrorCodes:: ErrorCode" (space added to avoid another match).
/// Because this expression is only used in this file, don't add some suppression mechanism to the already complex style checker, instead
/// work around by creating a namespace alias.
namespace ErrorCodeAlias = ErrorCodes;
/// Throw an exception if the argument is non zero.
class FunctionThrowIf : public IFunction
{
@ -93,7 +88,7 @@ public:
custom_message = message_column->getValue<String>();
}
std::optional<ErrorCodeAlias::ErrorCode> custom_error_code;
std::optional<ErrorCodes::ErrorCode> custom_error_code;
if (allow_custom_error_code_argument && arguments.size() == 3)
{
if (!isColumnConst(*(arguments[2].column)))
@ -125,7 +120,7 @@ public:
private:
template <typename T>
ColumnPtr execute(const IColumn * in_untyped, const std::optional<String> & message, const std::optional<ErrorCodeAlias::ErrorCode> & error_code) const
ColumnPtr execute(const IColumn * in_untyped, const std::optional<String> & message, const std::optional<ErrorCodes::ErrorCode> & error_code) const
{
const auto * in = checkAndGetColumn<ColumnVector<T>>(in_untyped);

View File

@ -811,6 +811,11 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
return AggregatedDataVariants::Type::low_cardinality_key32;
if (size_of_field == 8)
return AggregatedDataVariants::Type::low_cardinality_key64;
if (size_of_field == 16)
return AggregatedDataVariants::Type::low_cardinality_keys128;
if (size_of_field == 32)
return AggregatedDataVariants::Type::low_cardinality_keys256;
throw Exception("Logical error: low cardinality numeric column has sizeOfField not in 1, 2, 4, 8, 16, 32.", ErrorCodes::LOGICAL_ERROR);
}
if (size_of_field == 1)

View File

@ -232,6 +232,11 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
data->type = Type::CROSS;
sample_block_with_columns_to_add = right_sample_block;
}
else if (table_join->getClauses().empty())
{
data->type = Type::EMPTY;
sample_block_with_columns_to_add = right_sample_block;
}
else if (table_join->oneDisjunct())
{
const auto & key_names_right = table_join->getOnlyClause().key_names_right;

View File

@ -378,7 +378,7 @@ void ASTAlterCommand::formatImpl(const FormatSettings & settings, FormatState &
}
else if (type == ASTAlterCommand::FREEZE_ALL)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "FREEZE";
settings.ostr << (settings.hilite ? hilite_keyword : "") << "FREEZE" << (settings.hilite ? hilite_none : "");
if (!with_name.empty())
{
@ -399,7 +399,7 @@ void ASTAlterCommand::formatImpl(const FormatSettings & settings, FormatState &
}
else if (type == ASTAlterCommand::UNFREEZE_ALL)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "UNFREEZE";
settings.ostr << (settings.hilite ? hilite_keyword : "") << "UNFREEZE" << (settings.hilite ? hilite_none : "");
if (!with_name.empty())
{

View File

@ -4,11 +4,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
Block ArrayJoinTransform::transformHeader(Block header, const ArrayJoinActionPtr & array_join)
{
array_join->execute(header);

View File

@ -120,8 +120,15 @@ namespace
std::pair<String, String> getPathFromUriAndUriWithoutPath(const String & uri)
{
const size_t begin_of_path = uri.find('/', uri.find("//") + 2);
return {uri.substr(begin_of_path), uri.substr(0, begin_of_path)};
auto pos = uri.find("//");
if (pos != std::string::npos && pos + 2 < uri.length())
{
pos = uri.find('/', pos + 2);
if (pos != std::string::npos)
return {uri.substr(pos), uri.substr(0, pos)};
}
throw Exception("Storage HDFS requires valid URL to be set", ErrorCodes::BAD_ARGUMENTS);
}
std::vector<String> getPathsList(const String & path_from_uri, const String & uri_without_path, ContextPtr context, std::unordered_map<String, time_t> * last_mod_times = nullptr)

View File

@ -66,7 +66,7 @@ private:
StorageMetadataPtr metadata_snapshot;
bool deduplicate;
Names deduplicate_by_columns;
std::shared_ptr<MergeMutateSelectedEntry> merge_mutate_entry{nullptr};
MergeMutateSelectedEntryPtr merge_mutate_entry{nullptr};
TableLockHolder table_lock_holder;
FutureMergedMutatedPartPtr future_part{nullptr};
MergeTreeData::MutableDataPartPtr new_part;

View File

@ -798,7 +798,7 @@ void StorageMergeTree::loadMutations()
increment.value = std::max(increment.value.load(), current_mutations_by_version.rbegin()->first);
}
std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMerge(
MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
const StorageMetadataPtr & metadata_snapshot,
bool aggressive,
const String & partition_id,
@ -943,7 +943,7 @@ bool StorageMergeTree::merge(
SelectPartsDecision select_decision;
std::shared_ptr<MergeMutateSelectedEntry> merge_mutate_entry;
MergeMutateSelectedEntryPtr merge_mutate_entry;
{
std::unique_lock lock(currently_processing_in_background_mutex);
@ -989,7 +989,7 @@ bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & p
return currently_merging_mutating_parts.contains(part);
}
std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
const StorageMetadataPtr & metadata_snapshot, String * /* disable_reason */, TableLockHolder & /* table_lock_holder */,
std::unique_lock<std::mutex> & /*currently_processing_in_background_mutex_lock*/)
{
@ -1132,7 +1132,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
assert(!isStaticStorage());
auto metadata_snapshot = getInMemoryMetadataPtr();
std::shared_ptr<MergeMutateSelectedEntry> merge_entry, mutate_entry;
MergeMutateSelectedEntryPtr merge_entry, mutate_entry;
auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);

View File

@ -187,7 +187,7 @@ private:
friend struct CurrentlyMergingPartsTagger;
std::shared_ptr<MergeMutateSelectedEntry> selectPartsToMerge(
MergeMutateSelectedEntryPtr selectPartsToMerge(
const StorageMetadataPtr & metadata_snapshot,
bool aggressive,
const String & partition_id,
@ -200,7 +200,7 @@ private:
SelectPartsDecision * select_decision_out = nullptr);
std::shared_ptr<MergeMutateSelectedEntry> selectPartsToMutate(
MergeMutateSelectedEntryPtr selectPartsToMutate(
const StorageMetadataPtr & metadata_snapshot, String * disable_reason,
TableLockHolder & table_lock_holder, std::unique_lock<std::mutex> & currently_processing_in_background_mutex_lock);

View File

@ -7673,14 +7673,14 @@ namespace
/// But sometimes we need an opposite. When we deleting all_0_0_0_1 it can be non replicated to other replicas, so we are the only owner of this part.
/// In this case when we will drop all_0_0_0_1 we will drop blobs for all_0_0_0. But it will lead to dataloss. For such case we need to check that other replicas
/// still need parent part.
NameSet getParentLockedBlobs(zkutil::ZooKeeperPtr zookeeper_ptr, const std::string & zero_copy_part_path_prefix, const std::string & part_info_str, MergeTreeDataFormatVersion format_version, Poco::Logger * log)
std::pair<bool, NameSet> getParentLockedBlobs(zkutil::ZooKeeperPtr zookeeper_ptr, const std::string & zero_copy_part_path_prefix, const std::string & part_info_str, MergeTreeDataFormatVersion format_version, Poco::Logger * log)
{
NameSet files_not_to_remove;
MergeTreePartInfo part_info = MergeTreePartInfo::fromPartName(part_info_str, format_version);
/// No mutations -- no hardlinks -- no issues
if (part_info.mutation == 0)
return files_not_to_remove;
return {false, files_not_to_remove};
/// Getting all zero copy parts
Strings parts_str;
@ -7725,10 +7725,10 @@ NameSet getParentLockedBlobs(zkutil::ZooKeeperPtr zookeeper_ptr, const std::stri
LOG_TRACE(log, "Found files not to remove from parent part {}: [{}]", part_candidate_info_str, fmt::join(files_not_to_remove, ", "));
}
break;
return {true, files_not_to_remove};
}
}
return files_not_to_remove;
return {false, files_not_to_remove};
}
}
@ -7754,7 +7754,7 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
if (!files_not_to_remove_str.empty())
boost::split(files_not_to_remove, files_not_to_remove_str, boost::is_any_of("\n "));
auto parent_not_to_remove = getParentLockedBlobs(zookeeper_ptr, fs::path(zc_zookeeper_path).parent_path(), part_name, data_format_version, logger);
auto [has_parent, parent_not_to_remove] = getParentLockedBlobs(zookeeper_ptr, fs::path(zc_zookeeper_path).parent_path(), part_name, data_format_version, logger);
files_not_to_remove.insert(parent_not_to_remove.begin(), parent_not_to_remove.end());
String zookeeper_part_uniq_node = fs::path(zc_zookeeper_path) / part_id;
@ -7764,9 +7764,23 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
LOG_TRACE(logger, "Remove zookeeper lock {} for part {}", zookeeper_part_replica_node, part_name);
if (auto ec = zookeeper_ptr->tryRemove(zookeeper_part_replica_node); ec != Coordination::Error::ZOK && ec != Coordination::Error::ZNONODE)
if (auto ec = zookeeper_ptr->tryRemove(zookeeper_part_replica_node); ec != Coordination::Error::ZOK)
{
throw zkutil::KeeperException(ec, zookeeper_part_replica_node);
/// Very complex case. It means that lock already doesn't exist when we tried to remove it.
/// So we don't know are we owner of this part or not. Maybe we just mutated it, renamed on disk and failed to lock in ZK.
/// But during mutation we can have hardlinks to another part. So it's not Ok to remove blobs of this part if it was mutated.
if (ec == Coordination::Error::ZNONODE)
{
if (has_parent)
{
LOG_INFO(logger, "Lock on path {} for part {} doesn't exist, refuse to remove blobs", zookeeper_part_replica_node, part_name);
return {false, {}};
}
}
else
{
throw zkutil::KeeperException(ec, zookeeper_part_replica_node);
}
}
/// Check, maybe we were the last replica and can remove part forever

View File

@ -0,0 +1,5 @@
-- Tags: no-fasttest, no-cpu-aarch64
SELECT * FROM hdfsCluster('test_shard_localhost', '', 'TSV'); -- { serverError BAD_ARGUMENTS }
SELECT * FROM hdfsCluster('test_shard_localhost', ' ', 'TSV'); -- { serverError BAD_ARGUMENTS }
SELECT * FROM hdfsCluster('test_shard_localhost', '/', 'TSV'); -- { serverError BAD_ARGUMENTS }
SELECT * FROM hdfsCluster('test_shard_localhost', 'http/', 'TSV'); -- { serverError BAD_ARGUMENTS }

View File

@ -0,0 +1,20 @@
0 4950
1 14950
2 24950
3 34950
4 44950
5 54950
6 64950
7 74950
8 84950
9 94950
0 4950
1 14950
2 24950
3 34950
4 44950
5 54950
6 64950
7 74950
8 84950
9 94950

View File

@ -0,0 +1,9 @@
SET allow_suspicious_low_cardinality_types = 1;
-- LC UInt128
CREATE TABLE group_by_pk_lc_uint128 (`k` LowCardinality(UInt128), `v` UInt32) ENGINE = MergeTree ORDER BY k PARTITION BY v%50;
INSERT INTO group_by_pk_lc_uint128 SELECT number / 100, number FROM numbers(1000);
SELECT k, sum(v) AS s FROM group_by_pk_lc_uint128 GROUP BY k ORDER BY k ASC LIMIT 1024 SETTINGS optimize_aggregation_in_order = 1;
-- LC UInt256
CREATE TABLE group_by_pk_lc_uint256 (`k` LowCardinality(UInt256), `v` UInt32) ENGINE = MergeTree ORDER BY k PARTITION BY v%50;
INSERT INTO group_by_pk_lc_uint256 SELECT number / 100, number FROM numbers(1000);
SELECT k, sum(v) AS s FROM group_by_pk_lc_uint256 GROUP BY k ORDER BY k ASC LIMIT 1024 SETTINGS optimize_aggregation_in_order = 1;

View File

@ -0,0 +1,2 @@
1 0
\N 1

View File

@ -0,0 +1,12 @@
DROP TABLE IF EXISTS t1__fuzz_13;
DROP TABLE IF EXISTS t2__fuzz_47;
SET allow_suspicious_low_cardinality_types = 1;
CREATE TABLE t1__fuzz_13 (id Nullable(Int16)) ENGINE = MergeTree() ORDER BY id SETTINGS allow_nullable_key = 1;
CREATE TABLE t2__fuzz_47 (id LowCardinality(Int16)) ENGINE = MergeTree() ORDER BY id;
INSERT INTO t1__fuzz_13 VALUES (1);
INSERT INTO t2__fuzz_47 VALUES (1);
SELECT * FROM t1__fuzz_13 FULL OUTER JOIN t2__fuzz_47 ON 1 = 2;

View File

@ -59,10 +59,7 @@ declare -A EXTERN_TYPES
EXTERN_TYPES[ErrorCodes]=int
EXTERN_TYPES[ProfileEvents]=Event
EXTERN_TYPES[CurrentMetrics]=Metric
declare -A EXTERN_ALLOWED_CHARS
EXTERN_ALLOWED_CHARS[ErrorCodes]='_A-Z'
EXTERN_ALLOWED_CHARS[ProfileEvents]='_A-Za-z'
EXTERN_ALLOWED_CHARS[CurrentMetrics]='_A-Za-z'
EXTERN_TYPES_EXCLUDES=(
ProfileEvents::global_counters
ProfileEvents::Event
@ -87,18 +84,30 @@ EXTERN_TYPES_EXCLUDES=(
CurrentMetrics::Metric
CurrentMetrics::values
CurrentMetrics::Value
ErrorCodes::ErrorCode
ErrorCodes::getName
ErrorCodes::increment
ErrorCodes::end
ErrorCodes::values
ErrorCodes::values[i]
ErrorCodes::getErrorCodeByName
)
for extern_type in ${!EXTERN_TYPES[@]}; do
type_of_extern=${EXTERN_TYPES[$extern_type]}
allowed_chars=${EXTERN_ALLOWED_CHARS[$extern_type]}
allowed_chars='[_A-Za-z]+'
# Unused
# NOTE: to fix automatically, replace echo with:
# sed -i "/extern const $type_of_extern $val/d" $file
find $ROOT_PATH/{src,base,programs,utils} -name '*.h' -or -name '*.cpp' | {
grep -vP $EXCLUDE_DIRS | xargs grep -l -P "extern const $type_of_extern [$allowed_chars]+"
# NOTE: the check is pretty dumb and distinguish only by the type_of_extern,
# and this matches with zkutil::CreateMode
grep -v 'src/Common/ZooKeeper/Types.h'
} | {
grep -vP $EXCLUDE_DIRS | xargs grep -l -P "extern const $type_of_extern $allowed_chars"
} | while read file; do
grep -P "extern const $type_of_extern [$allowed_chars]+;" $file | sed -r -e "s/^.*?extern const $type_of_extern ([$allowed_chars]+);.*?$/\1/" | while read val; do
grep -P "extern const $type_of_extern $allowed_chars;" $file | sed -r -e "s/^.*?extern const $type_of_extern ($allowed_chars);.*?$/\1/" | while read val; do
if ! grep -q "$extern_type::$val" $file; then
# Excludes for SOFTWARE_EVENT/HARDWARE_EVENT/CACHE_EVENT in ThreadProfileEvents.cpp
if [[ ! $extern_type::$val =~ ProfileEvents::Perf.* ]]; then
@ -110,11 +119,13 @@ for extern_type in ${!EXTERN_TYPES[@]}; do
# Undefined
# NOTE: to fix automatically, replace echo with:
# ( grep -q -F 'namespace $extern_type' $file && sed -i -r "0,/(\s*)extern const $type_of_extern [$allowed_chars]+/s//\1extern const $type_of_extern $val;\n&/" $file || awk '{ print; if (ns == 1) { ns = 2 }; if (ns == 2) { ns = 0; print "namespace $extern_type\n{\n extern const $type_of_extern '$val';\n}" } }; /namespace DB/ { ns = 1; };' < $file > ${file}.tmp && mv ${file}.tmp $file )
# ( grep -q -F 'namespace $extern_type' $file && \
# sed -i -r "0,/(\s*)extern const $type_of_extern [$allowed_chars]+/s//\1extern const $type_of_extern $val;\n&/" $file || \
# awk '{ print; if (ns == 1) { ns = 2 }; if (ns == 2) { ns = 0; print "namespace $extern_type\n{\n extern const $type_of_extern '$val';\n}" } }; /namespace DB/ { ns = 1; };' < $file > ${file}.tmp && mv ${file}.tmp $file )
find $ROOT_PATH/{src,base,programs,utils} -name '*.h' -or -name '*.cpp' | {
grep -vP $EXCLUDE_DIRS | xargs grep -l -P "$extern_type::[$allowed_chars]+"
grep -vP $EXCLUDE_DIRS | xargs grep -l -P "$extern_type::$allowed_chars"
} | while read file; do
grep -P "$extern_type::[$allowed_chars]+" $file | sed -r -e "s/^.*?$extern_type::([$allowed_chars]+).*?$/\1/" | while read val; do
grep -P "$extern_type::$allowed_chars" $file | grep -P -v '^\s*//' | sed -r -e "s/^.*?$extern_type::($allowed_chars).*?$/\1/" | while read val; do
if ! grep -q "extern const $type_of_extern $val" $file; then
if ! in_array "$extern_type::$val" "${EXTERN_TYPES_EXCLUDES[@]}"; then
echo "$extern_type::$val is used in file $file but not defined"
@ -125,9 +136,9 @@ for extern_type in ${!EXTERN_TYPES[@]}; do
# Duplicates
find $ROOT_PATH/{src,base,programs,utils} -name '*.h' -or -name '*.cpp' | {
grep -vP $EXCLUDE_DIRS | xargs grep -l -P "$extern_type::[$allowed_chars]+"
grep -vP $EXCLUDE_DIRS | xargs grep -l -P "$extern_type::$allowed_chars"
} | while read file; do
grep -P "extern const $type_of_extern [$allowed_chars]+;" $file | sort | uniq -c | grep -v -P ' +1 ' && echo "Duplicate $extern_type in file $file"
grep -P "extern const $type_of_extern $allowed_chars;" $file | sort | uniq -c | grep -v -P ' +1 ' && echo "Duplicate $extern_type in file $file"
done
done