Merge branch 'Kusto-phase1' of github.com:ClibMouse/ClickHouse into Kusto-phase1

This commit is contained in:
Yong Wang 2022-09-05 22:29:21 -07:00
commit 7e9f4300fd
33 changed files with 377 additions and 220 deletions

View File

@ -1254,6 +1254,228 @@ jobs:
# shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestS3Debug0:
needs: [BuilderDebDebug]
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/stateless_s3_storage_debug
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Stateless tests (debug, s3 storage)
REPO_COPY=${{runner.temp}}/stateless_s3_storage_debug/ClickHouse
KILL_TIMEOUT=10800
RUN_BY_HASH_NUM=0
RUN_BY_HASH_TOTAL=3
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Functional test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestS3Debug1:
needs: [BuilderDebDebug]
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/stateless_s3_storage_debug
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Stateless tests (debug, s3 storage)
REPO_COPY=${{runner.temp}}/stateless_s3_storage_debug/ClickHouse
KILL_TIMEOUT=10800
RUN_BY_HASH_NUM=1
RUN_BY_HASH_TOTAL=3
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Functional test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestS3Debug2:
needs: [BuilderDebDebug]
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/stateless_s3_storage_debug
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Stateless tests (debug, s3 storage)
REPO_COPY=${{runner.temp}}/stateless_s3_storage_debug/ClickHouse
KILL_TIMEOUT=10800
RUN_BY_HASH_NUM=2
RUN_BY_HASH_TOTAL=3
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Functional test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestS3Tsan0:
needs: [BuilderDebTsan]
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/stateless_s3_storage_tsan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Stateless tests (tsan, s3 storage)
REPO_COPY=${{runner.temp}}/stateless_s3_storage_tsan/ClickHouse
KILL_TIMEOUT=10800
RUN_BY_HASH_NUM=0
RUN_BY_HASH_TOTAL=3
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Functional test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestS3Tsan1:
needs: [BuilderDebTsan]
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/stateless_s3_storage_tsan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Stateless tests (tsan, s3 storage)
REPO_COPY=${{runner.temp}}/stateless_s3_storage_tsan/ClickHouse
KILL_TIMEOUT=10800
RUN_BY_HASH_NUM=1
RUN_BY_HASH_TOTAL=3
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Functional test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestS3Tsan2:
needs: [BuilderDebTsan]
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/stateless_s3_storage_tsan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Stateless tests (tsan, s3 storage)
REPO_COPY=${{runner.temp}}/stateless_s3_storage_tsan/ClickHouse
KILL_TIMEOUT=10800
RUN_BY_HASH_NUM=2
RUN_BY_HASH_TOTAL=3
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Functional test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestAarch64:
needs: [BuilderDebAarch64]
runs-on: [self-hosted, func-tester-aarch64]
@ -3388,6 +3610,12 @@ jobs:
- FunctionalStatefulTestMsan
- FunctionalStatefulTestUBsan
- FunctionalStatelessTestReleaseS3
- FunctionalStatelessTestS3Debug0
- FunctionalStatelessTestS3Debug1
- FunctionalStatelessTestS3Debug2
- FunctionalStatelessTestS3Tsan0
- FunctionalStatelessTestS3Tsan1
- FunctionalStatelessTestS3Tsan2
- StressTestDebug
- StressTestAsan
- StressTestTsan

View File

@ -3,6 +3,9 @@
# shellcheck disable=SC2086
# shellcheck disable=SC2024
# Avoid overlaps with previous runs
dmesg --clear
set -x
# Thread Fuzzer allows to check more permutations of possible thread scheduling

View File

@ -1036,7 +1036,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
try
{
LOG_DEBUG(
log, "Initiailizing merge tree metadata cache lru_cache_size:{} continue_if_corrupted:{}", size, continue_if_corrupted);
log, "Initializing merge tree metadata cache lru_cache_size:{} continue_if_corrupted:{}", size, continue_if_corrupted);
global_context->initializeMergeTreeMetadataCache(path_str + "/" + "rocksdb", size);
}
catch (...)
@ -1089,7 +1089,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
}
LOG_DEBUG(log, "Initiailizing interserver credentials.");
LOG_DEBUG(log, "Initializing interserver credentials.");
global_context->updateInterserverCredentials(config());
if (config().has("macros"))

View File

@ -250,7 +250,7 @@ private:
}
}
SlotCount available(std::unique_lock<std::mutex> &)
SlotCount available(std::unique_lock<std::mutex> &) const
{
if (cur_concurrency < max_concurrency)
return max_concurrency - cur_concurrency;

View File

@ -61,7 +61,7 @@ enum class QueryCancellationState
// Usually it's hard to set some reasonable hard memory limit
// (especially, the default value). This class introduces new
// mechanisim for the limiting of memory usage.
// mechanism for the limiting of memory usage.
// Soft limit represents guaranteed amount of memory query/user
// may use. It's allowed to exceed this limit. But if hard limit
// is reached, query with the biggest overcommit ratio
@ -82,7 +82,7 @@ protected:
virtual void pickQueryToExcludeImpl() = 0;
// This mutex is used to disallow concurrent access
// to picked_tracker and cancelation_state variables.
// to picked_tracker and cancellation_state variables.
std::mutex overcommit_m;
std::condition_variable cv;

View File

@ -80,7 +80,7 @@ enum class Error : int32_t
ZUNIMPLEMENTED = -6, /// Operation is unimplemented
ZOPERATIONTIMEOUT = -7, /// Operation timeout
ZBADARGUMENTS = -8, /// Invalid arguments
ZINVALIDSTATE = -9, /// Invliad zhandle state
ZINVALIDSTATE = -9, /// Invalid zhandle state
/** API errors.
* This is never thrown by the server, it shouldn't be used other than
@ -445,7 +445,7 @@ public:
/** Usage scenario:
* - create an object and issue commands;
* - you provide callbacks for your commands; callbacks are invoked in internal thread and must be cheap:
* for example, just signal a condvar / fulfull a promise.
* for example, just signal a condvar / fulfill a promise.
* - you also may provide callbacks for watches; they are also invoked in internal thread and must be cheap.
* - whenever you receive exception with ZSESSIONEXPIRED code or method isExpired returns true,
* the ZooKeeper instance is no longer usable - you may only destroy it and probably create another.

View File

@ -171,7 +171,7 @@ public:
*/
virtual bool isSuitableForConstantFolding() const { return true; }
/** If function isSuitableForConstantFolding then, this method will be called during query analyzis
/** If function isSuitableForConstantFolding then, this method will be called during query analysis
* if some arguments are constants. For example logical functions (AndFunction, OrFunction) can
* return they result based on some constant arguments.
* Arguments are passed without modifications, useDefaultImplementationForNulls, useDefaultImplementationForNothing,
@ -394,7 +394,7 @@ private:
using FunctionOverloadResolverPtr = std::shared_ptr<IFunctionOverloadResolver>;
/// Old function interface. Check documentation in IFunction.h.
/// If client do not need statefull properties it can implement this interface.
/// If client do not need stateful properties it can implement this interface.
class IFunction
{
public:

View File

@ -753,7 +753,7 @@ bool InterpreterSystemQuery::dropReplicaImpl(ASTSystemQuery & query, const Stora
"if you want to clean the data and drop this replica", ErrorCodes::TABLE_WAS_NOT_DROPPED);
/// NOTE it's not atomic: replica may become active after this check, but before dropReplica(...)
/// However, the main usecase is to drop dead replica, which cannot become active.
/// However, the main use case is to drop dead replica, which cannot become active.
/// This check prevents only from accidental drop of some other replica.
if (zookeeper->exists(status.zookeeper_path + "/replicas/" + query.replica + "/is_active"))
throw Exception("Can't drop replica: " + query.replica + ", because it's active",

View File

@ -349,7 +349,7 @@ CNFQuery & CNFQuery::pullNotOutFunctions()
return *this;
}
CNFQuery & CNFQuery::pushNotInFuntions()
CNFQuery & CNFQuery::pushNotInFunctions()
{
transformAtoms([](const AtomicFormula & atom) -> AtomicFormula
{

View File

@ -133,7 +133,7 @@ public:
/// Converts != -> NOT =; <,>= -> (NOT) <; >,<= -> (NOT) <= for simpler matching
CNFQuery & pullNotOutFunctions();
/// Revert pullNotOutFunctions actions
CNFQuery & pushNotInFuntions();
CNFQuery & pushNotInFunctions();
/// (a OR b OR ...) AND (NOT a OR b OR ...) -> (b OR ...)
CNFQuery & reduce();

View File

@ -154,7 +154,7 @@ void optimizeGroupBy(ASTSelectQuery * select_query, ContextPtr context)
continue;
}
}
/// don't optimise functions that shadow any of it's arguments, e.g.:
/// don't optimize functions that shadow any of it's arguments, e.g.:
/// SELECT toString(dummy) as dummy FROM system.one GROUP BY dummy;
if (!function->alias.empty())
{
@ -632,7 +632,7 @@ bool convertQueryToCNF(ASTSelectQuery * select_query)
if (!cnf_form)
return false;
cnf_form->pushNotInFuntions();
cnf_form->pushNotInFunctions();
select_query->refWhere() = TreeCNFConverter::fromCNF(*cnf_form);
return true;
}

View File

@ -99,7 +99,7 @@ using TreeRewriterResultPtr = std::shared_ptr<const TreeRewriterResult>;
/// Tree Rewriter in terms of CMU slides @sa https://15721.courses.cs.cmu.edu/spring2020/slides/19-optimizer1.pdf
///
/// Optimises AST tree and collect information for further expression analysis in ExpressionAnalyzer.
/// Optimizes AST tree and collect information for further expression analysis in ExpressionAnalyzer.
/// Result AST has the following invariants:
/// * all aliases are substituted
/// * qualified names are translated

View File

@ -170,7 +170,7 @@ void WhereConstraintsOptimizer::perform()
return replaceTermsToConstants(atom, compare_graph);
})
.reduce()
.pushNotInFuntions();
.pushNotInFunctions();
if (optimize_append_index)
AddIndexConstraintsOptimizer(metadata_snapshot).perform(cnf);

View File

@ -46,6 +46,8 @@ void OwnSplitChannel::log(const Poco::Message & msg)
void OwnSplitChannel::tryLogSplit(const Poco::Message & msg)
{
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
try
{
logSplit(msg);
@ -62,8 +64,6 @@ void OwnSplitChannel::tryLogSplit(const Poco::Message & msg)
/// but let's log it into the stderr at least.
catch (...)
{
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
const std::string & exception_message = getCurrentExceptionMessage(true);
const std::string & message = msg.getText();

View File

@ -1075,7 +1075,7 @@ void MergeTreeData::loadDataPartsFromDisk(
LOG_ERROR(log,
"Detaching broken part {}{} (size: {}). "
"If it happened after update, it is likely because of backward incompability. "
"If it happened after update, it is likely because of backward incompatibility. "
"You need to resolve this manually",
getFullPathOnDisk(part_disk_ptr), part_name, part_size_str);
std::lock_guard loading_lock(mutex);
@ -1444,7 +1444,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
continue;
}
/// Check if CSNs were witten after committing transaction, update and write if needed.
/// Check if CSNs were written after committing transaction, update and write if needed.
bool version_updated = false;
chassert(!version.creation_tid.isEmpty());
if (!part->version.creation_csn)
@ -1867,18 +1867,18 @@ size_t MergeTreeData::clearOldPartsFromFilesystem(bool force)
void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts, bool throw_on_error, NameSet * parts_failed_to_delete)
{
NameSet part_names_successeded;
NameSet part_names_succeed;
auto get_failed_parts = [&part_names_successeded, &parts_failed_to_delete, &parts] ()
auto get_failed_parts = [&part_names_succeed, &parts_failed_to_delete, &parts] ()
{
if (part_names_successeded.size() == parts.size())
if (part_names_succeed.size() == parts.size())
return;
if (parts_failed_to_delete)
{
for (const auto & part : parts)
{
if (!part_names_successeded.contains(part->name))
if (!part_names_succeed.contains(part->name))
parts_failed_to_delete->insert(part->name);
}
}
@ -1886,7 +1886,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts, bool
try
{
clearPartsFromFilesystemImpl(parts, &part_names_successeded);
clearPartsFromFilesystemImpl(parts, &part_names_succeed);
get_failed_parts();
}
catch (...)
@ -1898,7 +1898,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts, bool
}
}
void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_to_remove, NameSet * part_names_successed)
void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_to_remove, NameSet * part_names_succeed)
{
const auto settings = getSettings();
if (parts_to_remove.size() > 1 && settings->max_part_removal_threads > 1 && parts_to_remove.size() > settings->concurrent_part_removal_threshold)
@ -1918,10 +1918,10 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
part->remove();
if (part_names_successed)
if (part_names_succeed)
{
std::lock_guard lock(part_names_mutex);
part_names_successed->insert(part->name);
part_names_succeed->insert(part->name);
}
});
}
@ -1934,13 +1934,13 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
{
LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
part->remove();
if (part_names_successed)
part_names_successed->insert(part->name);
if (part_names_succeed)
part_names_succeed->insert(part->name);
}
}
}
size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirecory()
size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirectory()
{
/**
* Remove old (configured by setting) broken detached parts.
@ -2093,7 +2093,7 @@ void MergeTreeData::rename(const String & new_table_path, const StorageID & new_
{
/// Relies on storage path, so we drop it during rename
/// it will be recreated automatiaclly.
/// it will be recreated automatically.
std::lock_guard wal_lock(write_ahead_log_mutex);
if (write_ahead_log)
{
@ -3928,7 +3928,7 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
throw Exception("Volume " + name + " does not exists on policy " + getStoragePolicy()->getName(), ErrorCodes::UNKNOWN_DISK);
if (parts.empty())
throw Exception("Nothing to move (сheck that the partition exists).", ErrorCodes::NO_SUCH_DATA_PART);
throw Exception("Nothing to move (check that the partition exists).", ErrorCodes::NO_SUCH_DATA_PART);
std::erase_if(parts, [&](auto part_ptr)
{
@ -6286,7 +6286,7 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
{
// Store metadata for replicated table.
// Do nothing for non-replocated.
// Do nothing for non-replicated.
createAndStoreFreezeMetadata(disk, part, fs::path(backup_part_path) / part->data_part_storage->getPartDirectory());
};
@ -6599,7 +6599,7 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge
auto disk = moving_part.reserved_space->getDisk();
if (supportsReplication() && disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
{
/// If we acuqired lock than let's try to move. After one
/// If we acquired lock than let's try to move. After one
/// replica will actually move the part from disk to some
/// zero-copy storage other replicas will just fetch
/// metainformation.

View File

@ -177,7 +177,7 @@ public:
/// Rename map new_name -> old_name
std::unordered_map<String, String> rename_map;
bool isColumnRenamed(const String & new_name) const { return rename_map.count(new_name) > 0; }
bool isColumnRenamed(const String & new_name) const { return rename_map.contains(new_name); }
String getColumnOldName(const String & new_name) const { return rename_map.at(new_name); }
};
@ -634,7 +634,7 @@ public:
/// Delete WAL files containing parts, that all already stored on disk.
size_t clearOldWriteAheadLogs();
size_t clearOldBrokenPartsFromDetachedDirecory();
size_t clearOldBrokenPartsFromDetachedDirectory();
/// Delete all directories which names begin with "tmp"
/// Must be called with locked lockForShare() because it's using relative_data_path.
@ -761,7 +761,7 @@ public:
const ColumnsDescription & getObjectColumns() const { return object_columns; }
/// Creates desciprion of columns of data type Object from the range of data parts.
/// Creates description of columns of data type Object from the range of data parts.
static ColumnsDescription getObjectColumns(
const DataPartsVector & parts, const ColumnsDescription & storage_columns);
@ -1083,7 +1083,7 @@ protected:
DataPartsIndexes::index<TagByInfo>::type & data_parts_by_info;
DataPartsIndexes::index<TagByStateAndInfo>::type & data_parts_by_state_and_info;
/// Current descriprion of columns of data type Object.
/// Current description of columns of data type Object.
/// It changes only when set of parts is changed and is
/// protected by @data_parts_mutex.
ColumnsDescription object_columns;
@ -1125,7 +1125,7 @@ protected:
return {begin, end};
}
/// Creates desciprion of columns of data type Object from the range of data parts.
/// Creates description of columns of data type Object from the range of data parts.
static ColumnsDescription getObjectColumns(
boost::iterator_range<DataPartIteratorByStateAndInfo> range, const ColumnsDescription & storage_columns);
@ -1263,7 +1263,7 @@ private:
void checkPartCanBeAddedToTable(MutableDataPartPtr & part, DataPartsLock & lock) const;
/// Preparing itself to be committed in memory: fill some fields inside part, add it to data_parts_indexes
/// in precommitted state and to transasction
/// in precommitted state and to transaction
void preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, DataPartStorageBuilderPtr builder);
/// Low-level method for preparing parts for commit (in-memory).
@ -1351,7 +1351,7 @@ private:
/// Remove parts from disk calling part->remove(). Can do it in parallel in case of big set of parts and enabled settings.
/// If we fail to remove some part and throw_on_error equal to `true` will throw an exception on the first failed part.
/// Otherwise, in non-parallel case will break and return.
void clearPartsFromFilesystemImpl(const DataPartsVector & parts, NameSet * part_names_successed);
void clearPartsFromFilesystemImpl(const DataPartsVector & parts, NameSet * part_names_succeed);
TemporaryParts temporary_parts;
};

View File

@ -149,7 +149,7 @@ void ReplicatedMergeTreeAttachThread::runImpl()
storage.clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"});
storage.clearOldWriteAheadLogs();
if (storage.getSettings()->merge_tree_enable_clear_old_broken_detached)
storage.clearOldBrokenPartsFromDetachedDirecory();
storage.clearOldBrokenPartsFromDetachedDirectory();
storage.createNewZooKeeperNodes();
storage.syncPinnedPartUUIDs();

View File

@ -66,7 +66,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
storage.clearOldWriteAheadLogs();
storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
if (storage.getSettings()->merge_tree_enable_clear_old_broken_detached)
storage.clearOldBrokenPartsFromDetachedDirecory();
storage.clearOldBrokenPartsFromDetachedDirectory();
}
/// This is loose condition: no problem if we actually had lost leadership at this moment

View File

@ -1202,7 +1202,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
cleared_count += clearOldMutations();
cleared_count += clearEmptyParts();
if (getSettings()->merge_tree_enable_clear_old_broken_detached)
cleared_count += clearOldBrokenPartsFromDetachedDirecory();
cleared_count += clearOldBrokenPartsFromDetachedDirectory();
return cleared_count;
/// TODO maybe take into account number of cleared objects when calculating backoff
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false);

View File

@ -4582,7 +4582,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
if (entry.alter_version < metadata_version)
{
/// TODO Can we replace it with LOGICAL_ERROR?
/// As for now, it may rerely happen due to reordering of ALTER_METADATA entries in the queue of
/// As for now, it may rarely happen due to reordering of ALTER_METADATA entries in the queue of
/// non-initial replica and also may happen after stale replica recovery.
LOG_WARNING(log, "Attempt to update metadata of version {} "
"to older version {} when processing log entry {}: {}",
@ -4664,7 +4664,7 @@ PartitionBlockNumbersHolder StorageReplicatedMergeTree::allocateBlockNumbersInAf
}
else
{
/// TODO: Implement optimal block number aqcuisition algorithm in multiple (but not all) partitions
/// TODO: Implement optimal block number acquisition algorithm in multiple (but not all) partitions
EphemeralLocksInAllPartitions lock_holder(
fs::path(zookeeper_path) / "block_numbers", "block-", fs::path(zookeeper_path) / "temp", *zookeeper);
@ -4841,7 +4841,7 @@ void StorageReplicatedMergeTree::alter(
Coordination::Responses results;
Coordination::Error rc = zookeeper->tryMulti(ops, results);
/// For the sake of constitency with mechanics of concurrent background process of assigning parts merge tasks
/// For the sake of consistency with mechanics of concurrent background process of assigning parts merge tasks
/// this placeholder must be held up until the moment of committing into ZK of the mutation entry
/// See ReplicatedMergeTreeMergePredicate::canMergeTwoParts() method
partition_block_numbers_holder.reset();
@ -5897,7 +5897,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, Conte
/// partitions, saves them in the mutation entry and writes the mutation entry to a new ZK node in
/// the /mutations folder. This block numbers are needed to determine which parts should be mutated and
/// which shouldn't (parts inserted after the mutation will have the block number higher than the
/// block number acquired by the mutation in that partition and so will not be mutatied).
/// block number acquired by the mutation in that partition and so will not be mutated).
/// This block number is called "mutation version" in that partition.
///
/// Mutation versions are acquired atomically in all partitions, so the case when an insert in some
@ -7217,7 +7217,7 @@ bool StorageReplicatedMergeTree::addOpsToDropAllPartsInPartition(
}
void StorageReplicatedMergeTree::dropAllPartsInPartitions(
zkutil::ZooKeeper & zookeeper, const Strings partition_ids, std::vector<LogEntryPtr> & entries, ContextPtr query_context, bool detach)
zkutil::ZooKeeper & zookeeper, const Strings & partition_ids, std::vector<LogEntryPtr> & entries, ContextPtr query_context, bool detach)
{
entries.reserve(partition_ids.size());
@ -7600,7 +7600,7 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
if (!children.empty())
{
LOG_TRACE(logger, "Found {} ({}) zookeper locks for {}", zookeeper_part_uniq_node, children.size(), fmt::join(children, ", "));
LOG_TRACE(logger, "Found {} ({}) zookeeper locks for {}", zookeeper_part_uniq_node, children.size(), fmt::join(children, ", "));
part_has_no_more_locks = false;
continue;
}
@ -7706,12 +7706,12 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
String zookeeper_part_uniq_node = fs::path(zc_zookeeper_path) / id;
Strings id_replicas;
zookeeper->tryGetChildren(zookeeper_part_uniq_node, id_replicas);
LOG_TRACE(log, "Found zookeper replicas for {}: {}", zookeeper_part_uniq_node, id_replicas.size());
LOG_TRACE(log, "Found zookeeper replicas for {}: {}", zookeeper_part_uniq_node, id_replicas.size());
replicas.insert(id_replicas.begin(), id_replicas.end());
}
}
LOG_TRACE(log, "Found zookeper replicas for part {}: {}", part.name, replicas.size());
LOG_TRACE(log, "Found zookeeper replicas for part {}: {}", part.name, replicas.size());
Strings active_replicas;
@ -7724,7 +7724,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
if ((replica != replica_name) && (zookeeper->exists(fs::path(zookeeper_path) / "replicas" / replica / "is_active")))
active_replicas.push_back(replica);
LOG_TRACE(log, "Found zookeper active replicas for part {}: {}", part.name, active_replicas.size());
LOG_TRACE(log, "Found zookeeper active replicas for part {}: {}", part.name, active_replicas.size());
if (active_replicas.empty())
return "";
@ -8159,7 +8159,7 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(
if (!created)
{
String mode_str = mode == zkutil::CreateMode::Persistent ? "persistent" : "ephemral";
String mode_str = mode == zkutil::CreateMode::Persistent ? "persistent" : "ephemeral";
throw Exception(ErrorCodes::NOT_FOUND_NODE, "Cannot create {} zero copy lock {} because part was unlocked from zookeeper", mode_str, zookeeper_node);
}
}

View File

@ -754,7 +754,7 @@ private:
std::vector<EphemeralLockInZooKeeper> & delimiting_block_locks,
std::vector<size_t> & log_entry_ops_idx);
void dropAllPartsInPartitions(
zkutil::ZooKeeper & zookeeper, const Strings partition_ids, std::vector<LogEntryPtr> & entries, ContextPtr query_context, bool detach);
zkutil::ZooKeeper & zookeeper, const Strings & partition_ids, std::vector<LogEntryPtr> & entries, ContextPtr query_context, bool detach);
LogEntryPtr dropAllPartsInPartition(
zkutil::ZooKeeper & zookeeper, const String & partition_id, ContextPtr query_context, bool detach);

View File

@ -69,13 +69,14 @@ public:
virtual ~ITableFunction() = default;
protected:
virtual AccessType getSourceAccessType() const;
private:
virtual StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const = 0;
virtual const char * getStorageTypeName() const = 0;
virtual AccessType getSourceAccessType() const;
};
using TableFunctionPtr = std::shared_ptr<ITableFunction>;

View File

@ -7,6 +7,8 @@
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionHDFS.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Interpreters/Context.h>
#include <Access/Common/AccessFlags.h>
namespace DB
{
@ -29,7 +31,10 @@ StoragePtr TableFunctionHDFS::getStorage(
ColumnsDescription TableFunctionHDFS::getActualTableStructure(ContextPtr context) const
{
if (structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageHDFS::getTableStructureFromData(format, filename, compression_method, context);
}
return parseColumnsListFromString(structure, context);
}

View File

@ -14,6 +14,7 @@
#include <TableFunctions/TableFunctionHDFS.h>
#include <TableFunctions/TableFunctionHDFSCluster.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Access/Common/AccessFlags.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/IAST_fwd.h>
@ -74,7 +75,10 @@ void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, Conte
ColumnsDescription TableFunctionHDFSCluster::getActualTableStructure(ContextPtr context) const
{
if (structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageHDFS::getTableStructureFromData(format, filename, compression_method, context);
}
return parseColumnsListFromString(structure, context);
}

View File

@ -8,6 +8,7 @@
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionS3.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Access/Common/AccessFlags.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageS3.h>
@ -133,6 +134,7 @@ ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context)
{
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageS3::getTableStructureFromData(
configuration.format,
S3::URI(Poco::URI(configuration.url)),

View File

@ -15,6 +15,7 @@
#include <TableFunctions/TableFunctionS3.h>
#include <TableFunctions/TableFunctionS3Cluster.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Access/Common/AccessFlags.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/IAST_fwd.h>
@ -83,6 +84,7 @@ ColumnsDescription TableFunctionS3Cluster::getActualTableStructure(ContextPtr co
{
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageS3::getTableStructureFromData(
configuration.format,
S3::URI(Poco::URI(configuration.url)),

View File

@ -10,6 +10,7 @@
#include <Storages/StorageExternalDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Interpreters/Context.h>
#include <Formats/FormatFactory.h>
@ -113,12 +114,15 @@ ReadWriteBufferFromHTTP::HTTPHeaderEntries TableFunctionURL::getHeaders() const
ColumnsDescription TableFunctionURL::getActualTableStructure(ContextPtr context) const
{
if (structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageURL::getTableStructureFromData(format,
filename,
chooseCompressionMethod(Poco::URI(filename).getPath(), compression_method),
getHeaders(),
std::nullopt,
context);
}
return parseColumnsListFromString(structure, context);
}

View File

@ -247,6 +247,12 @@ CI_CONFIG = {
"Stateless tests (release, s3 storage)": {
"required_build": "package_release",
},
"Stateless tests (debug, s3 storage)": {
"required_build": "package_debug",
},
"Stateless tests (tsan, s3 storage)": {
"required_build": "package_tsan",
},
"Stress test (asan)": {
"required_build": "package_asan",
},

View File

@ -1,31 +1,26 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
uuids = []
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["configs/conf.xml"], with_nginx=True
)
@pytest.fixture(scope="module")
def cluster():
@pytest.fixture(scope="module", autouse=True)
def setup_node():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node1", main_configs=["configs/conf.xml"], with_nginx=True
)
cluster.start()
yield cluster
node1.query(
"insert into table function url(url1) partition by column3 values (1, 2, 3), (3, 2, 1), (1, 3, 2)"
)
yield
finally:
cluster.shutdown()
def test_partition_by(cluster):
node1 = cluster.instances["node1"]
node1.query(
f"insert into table function url(url1) partition by column3 values (1, 2, 3), (3, 2, 1), (1, 3, 2)"
)
def test_partition_by():
result = node1.query(
f"select * from url('http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"
)
@ -38,3 +33,45 @@ def test_partition_by(cluster):
f"select * from url('http://nginx:80/test_3', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"
)
assert result.strip() == "1\t2\t3"
def test_table_function_url_access_rights():
node1.query("CREATE USER OR REPLACE u1")
expected_error = "necessary to have grant CREATE TEMPORARY TABLE, URL ON *.*"
assert expected_error in node1.query_and_get_error(
f"SELECT * FROM url('http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')",
user="u1",
)
expected_error = "necessary to have grant CREATE TEMPORARY TABLE, URL ON *.*"
assert expected_error in node1.query_and_get_error(
f"SELECT * FROM url('http://nginx:80/test_1', 'TSV')", user="u1"
)
assert node1.query(
f"DESCRIBE TABLE url('http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')",
user="u1",
) == TSV([["column1", "UInt32"], ["column2", "UInt32"], ["column3", "UInt32"]])
assert node1.query(
f"DESCRIBE TABLE url('http://nginx:80/not-exist', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')",
user="u1",
) == TSV([["column1", "UInt32"], ["column2", "UInt32"], ["column3", "UInt32"]])
expected_error = "necessary to have grant URL ON *.*"
assert expected_error in node1.query_and_get_error(
f"DESCRIBE TABLE url('http://nginx:80/test_1', 'TSV')", user="u1"
)
node1.query("GRANT URL ON *.* TO u1")
assert node1.query(
f"DESCRIBE TABLE url('http://nginx:80/test_1', 'TSV')",
user="u1",
) == TSV(
[
["c1", "Nullable(Int64)"],
["c2", "Nullable(Int64)"],
["c3", "Nullable(Int64)"],
]
)

View File

@ -1,35 +0,0 @@
<?xml version="1.0"?>
<clickhouse>
<logger>
<level>trace</level>
<console>true</console>
</logger>
<tcp_port>9000</tcp_port>
<path>./</path>
<mark_cache_size>0</mark_cache_size>
<users>
<default>
<password></password>
<networks>
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
<access_management>1</access_management>
</default>
</users>
<profiles>
<default/>
</profiles>
<quotas>
<default />
</quotas>
</clickhouse>

View File

@ -1,108 +0,0 @@
#!/usr/bin/env bash
# Tags: no-tsan, no-asan, no-ubsan, no-msan, no-parallel, no-fasttest
# Tag no-tsan: requires jemalloc to track small allocations
# Tag no-asan: requires jemalloc to track small allocations
# Tag no-ubsan: requires jemalloc to track small allocations
# Tag no-msan: requires jemalloc to track small allocations
#
# Regression for INSERT SELECT, that abnormally terminates the server
# in case of too small memory limits.
#
# NOTE: After #24483 had been merged the only place where the allocation may
# fail is the insert into PODArray in DB::OwnSplitChannel::log, but after
# #24069 those errors will be ignored, so to check new behaviour separate
# server is required.
#
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
server_opts=(
"--config-file=$CURDIR/$(basename "${BASH_SOURCE[0]}" .sh).config.xml"
"--"
# to avoid multiple listen sockets (complexity for port discovering)
"--listen_host=127.1"
# we will discover the real port later.
"--tcp_port=0"
"--shutdown_wait_unfinished=0"
)
CLICKHOUSE_WATCHDOG_ENABLE=0 $CLICKHOUSE_SERVER_BINARY "${server_opts[@]}" >clickhouse-server.log 2>clickhouse-server.stderr &
server_pid=$!
trap cleanup EXIT
function cleanup()
{
kill -9 $server_pid
echo "Test failed. Server log:"
cat clickhouse-server.log
cat clickhouse-server.stderr
rm -f clickhouse-server.log
rm -f clickhouse-server.stderr
exit 1
}
server_port=
i=0 retries=300
# wait until server will start to listen (max 30 seconds)
while [[ -z $server_port ]] && [[ $i -lt $retries ]]; do
server_port=$(lsof -n -a -P -i tcp -s tcp:LISTEN -p $server_pid 2>/dev/null | awk -F'[ :]' '/LISTEN/ { print $(NF-1) }')
((++i))
sleep 0.1
if ! kill -0 $server_pid >& /dev/null; then
echo "No server (pid $server_pid)"
break
fi
done
if [[ -z $server_port ]]; then
echo "Cannot wait for LISTEN socket" >&2
exit 1
fi
# wait for the server to start accepting tcp connections (max 30 seconds)
i=0 retries=300
while ! $CLICKHOUSE_CLIENT_BINARY --host 127.1 --port "$server_port" --format Null -q 'select 1' 2>/dev/null && [[ $i -lt $retries ]]; do
sleep 0.1
if ! kill -0 $server_pid >& /dev/null; then
echo "No server (pid $server_pid)"
break
fi
done
if ! $CLICKHOUSE_CLIENT_BINARY --host 127.1 --port "$server_port" --format Null -q 'select 1'; then
echo "Cannot wait until server will start accepting connections on <tcp_port>" >&2
exit 1
fi
# it is not mandatory to use existing table since it fails earlier, hence just a placeholder.
# this is format of INSERT SELECT, that pass these settings exactly for INSERT query not the SELECT
if $CLICKHOUSE_CLIENT_BINARY --host 127.1 --port "$server_port" --format Null --send_logs_level=warning --max_memory_usage=1 --max_untracked_memory=1 -q 'insert into placeholder_table_name select * from numbers_mt(65535)' >& /dev/null; then
echo "INSERT SELECT should fail" >&2
exit 1
fi
# no sleep, since flushing to stderr should not be buffered.
if ! grep -E -q 'Cannot add message to the log: Code: 60.*placeholder_table_name' clickhouse-server.stderr; then
echo "Adding message to the log should fail" >&2
exit 1
fi
# check that server is still alive
$CLICKHOUSE_CLIENT_BINARY --host 127.1 --port "$server_port" --format Null -q 'SELECT 1'
# send TERM and save the error code to ensure that it is 0 (EXIT_SUCCESS)
kill $server_pid
wait $server_pid
return_code=$?
trap '' EXIT
if [ $return_code != 0 ]; then
cat clickhouse-server.log
cat clickhouse-server.stderr
fi
rm -f clickhouse-server.log
rm -f clickhouse-server.stderr
exit $return_code

View File

@ -361,6 +361,8 @@ int decompressFiles(int input_fd, char * path, char * name, bool & have_compress
#endif
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
uint32_t getInode(const char * self)
{
std::ifstream maps("/proc/self/maps");
@ -386,6 +388,8 @@ uint32_t getInode(const char * self)
return 0;
}
#endif
int main(int/* argc*/, char* argv[])
{
char self[4096] = {0};
@ -409,6 +413,7 @@ int main(int/* argc*/, char* argv[])
else
name = file_path;
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
/// get inode of this executable
uint32_t inode = getInode(self);
if (inode == 0)
@ -460,6 +465,7 @@ int main(int/* argc*/, char* argv[])
printf("No target executable - decompression only was performed.\n");
return 0;
}
#endif
int input_fd = open(self, O_RDONLY);
if (input_fd == -1)
@ -522,19 +528,21 @@ int main(int/* argc*/, char* argv[])
if (has_exec)
{
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
/// write one byte to the lock in case other copies of compressed are running to indicate that
/// execution should be performed
write(lock, "1", 1);
#endif
execv(self, argv);
/// This part of code will be reached only if error happened
perror("execv");
return 1;
}
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
/// since inodes can be reused - it's a precaution if lock file already exists and have size of 1
ftruncate(lock, 0);
#endif
printf("No target executable - decompression only was performed.\n");
}