Merge branch 'master' into rolling-upgrade

This commit is contained in:
Dan Roscigno 2022-12-09 07:10:35 -05:00 committed by GitHub
commit 5782a95ee6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
117 changed files with 957 additions and 979 deletions

View File

@ -61,6 +61,7 @@ jobs:
committer: "robot-clickhouse <robot-clickhouse@users.noreply.github.com>"
commit-message: Update version_date.tsv and changelogs after ${{ env.GITHUB_TAG }}
branch: auto/${{ env.GITHUB_TAG }}
assignees: ${{ github.event.sender.login }} # assign the PR to the tag pusher
delete-branch: true
title: Update version_date.tsv and changelogs after ${{ env.GITHUB_TAG }}
labels: do not test

View File

@ -244,7 +244,7 @@ The username and password can be indicated in one of three ways:
$ echo 'SELECT 1' | curl 'http://user:password@localhost:8123/' -d @-
```
1. In the user and password URL parameters. Example:
2. In the user and password URL parameters (*We do not recommend using this method as the parameter might be logged by web proxy and cached in the browser*). Example:
<!-- -->
@ -252,7 +252,7 @@ $ echo 'SELECT 1' | curl 'http://user:password@localhost:8123/' -d @-
$ echo 'SELECT 1' | curl 'http://localhost:8123/?user=user&password=password' -d @-
```
1. Using X-ClickHouse-User and X-ClickHouse-Key headers. Example:
3. Using X-ClickHouse-User and X-ClickHouse-Key headers. Example:
<!-- -->

View File

@ -7,7 +7,7 @@ sidebar_label: UPDATE
# ALTER TABLE … UPDATE Statements
``` sql
ALTER TABLE [db.]table [ON CLUSTER cluster] UPDATE column1 = expr1 [, ...] WHERE filter_expr
ALTER TABLE [db.]table [ON CLUSTER cluster] UPDATE column1 = expr1 [, ...] [IN PARTITION partition_id] WHERE filter_expr
```
Manipulates data matching the specified filtering expression. Implemented as a [mutation](/docs/en/sql-reference/statements/alter/index.md#mutations).

View File

@ -1,4 +1,15 @@
set (CLICKHOUSE_DISKS_SOURCES DisksApp.cpp ICommand.cpp)
set (CLICKHOUSE_DISKS_SOURCES
DisksApp.cpp
ICommand.cpp
CommandCopy.cpp
CommandLink.cpp
CommandList.cpp
CommandListDisks.cpp
CommandMkDir.cpp
CommandMove.cpp
CommandRead.cpp
CommandRemove.cpp
CommandWrite.cpp)
set (CLICKHOUSE_DISKS_LINK
PRIVATE

View File

@ -1,7 +1,6 @@
#pragma once
#include "ICommand.h"
#include <Interpreters/Context.h>
#include <Common/TerminalSize.h>
namespace DB
{
@ -11,7 +10,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
class CommandCopy : public ICommand
class CommandCopy final : public ICommand
{
public:
CommandCopy()
@ -51,16 +50,16 @@ public:
String disk_name_from = config.getString("diskFrom", config.getString("disk", "default"));
String disk_name_to = config.getString("diskTo", config.getString("disk", "default"));
String path_from = command_arguments[0];
String path_to = command_arguments[1];
const String & path_from = command_arguments[0];
const String & path_to = command_arguments[1];
DiskPtr disk_from = global_context->getDisk(disk_name_from);
DiskPtr disk_to = global_context->getDisk(disk_name_to);
String full_path_from = fullPathWithValidate(disk_from, path_from);
String full_path_to = fullPathWithValidate(disk_to, path_to);
String relative_path_from = validatePathAndGetAsRelative(path_from);
String relative_path_to = validatePathAndGetAsRelative(path_to);
disk_from->copy(full_path_from, disk_to, full_path_to);
disk_from->copy(relative_path_from, disk_to, relative_path_to);
}
};
}

View File

@ -1,5 +1,3 @@
#pragma once
#include "ICommand.h"
#include <Interpreters/Context.h>
@ -11,7 +9,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
class CommandLink : public ICommand
class CommandLink final : public ICommand
{
public:
CommandLink()
@ -40,15 +38,15 @@ public:
String disk_name = config.getString("disk", "default");
String path_from = command_arguments[0];
String path_to = command_arguments[1];
const String & path_from = command_arguments[0];
const String & path_to = command_arguments[1];
DiskPtr disk = global_context->getDisk(disk_name);
String full_path_from = fullPathWithValidate(disk, path_from);
String full_path_to = fullPathWithValidate(disk, path_to);
String relative_path_from = validatePathAndGetAsRelative(path_from);
String relative_path_to = validatePathAndGetAsRelative(path_to);
disk->createHardLink(full_path_from, full_path_to);
disk->createHardLink(relative_path_from, relative_path_to);
}
};
}

View File

@ -1,7 +1,6 @@
#pragma once
#include "ICommand.h"
#include <Interpreters/Context.h>
#include <Common/TerminalSize.h>
namespace DB
{
@ -11,7 +10,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
class CommandList : public ICommand
class CommandList final : public ICommand
{
public:
CommandList()
@ -46,43 +45,47 @@ public:
String disk_name = config.getString("disk", "default");
String path = command_arguments[0];
const String & path = command_arguments[0];
DiskPtr disk = global_context->getDisk(disk_name);
String full_path = fullPathWithValidate(disk, path);
String relative_path = validatePathAndGetAsRelative(path);
bool recursive = config.getBool("recursive", false);
if (recursive)
listRecursive(disk, full_path);
listRecursive(disk, relative_path);
else
list(disk, full_path);
list(disk, relative_path);
}
private:
static void list(const DiskPtr & disk, const std::string & full_path)
static void list(const DiskPtr & disk, const std::string & relative_path)
{
std::vector<String> file_names;
disk->listFiles(full_path, file_names);
disk->listFiles(relative_path, file_names);
for (const auto & file_name : file_names)
std::cout << file_name << '\n';
}
static void listRecursive(const DiskPtr & disk, const std::string & full_path)
static void listRecursive(const DiskPtr & disk, const std::string & relative_path)
{
std::vector<String> file_names;
disk->listFiles(full_path, file_names);
disk->listFiles(relative_path, file_names);
std::cout << full_path << ":\n";
for (const auto & file_name : file_names)
std::cout << file_name << '\n';
std::cout << "\n";
std::cout << relative_path << ":\n";
if (!file_names.empty())
{
for (const auto & file_name : file_names)
std::cout << file_name << '\n';
std::cout << "\n";
}
for (const auto & file_name : file_names)
{
auto path = full_path + "/" + file_name;
auto path = relative_path + "/" + file_name;
if (disk->isDirectory(path))
listRecursive(disk, path);
}

View File

@ -1,5 +1,3 @@
#pragma once
#include "ICommand.h"
#include <Interpreters/Context.h>
@ -11,7 +9,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
class CommandListDisks : public ICommand
class CommandListDisks final : public ICommand
{
public:
CommandListDisks()

View File

@ -1,7 +1,7 @@
#pragma once
#include "ICommand.h"
#include <Interpreters/Context.h>
#include <Common/TerminalSize.h>
namespace DB
{
@ -11,7 +11,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
class CommandMkDir : public ICommand
class CommandMkDir final : public ICommand
{
public:
CommandMkDir()
@ -46,17 +46,17 @@ public:
String disk_name = config.getString("disk", "default");
String path = command_arguments[0];
const String & path = command_arguments[0];
DiskPtr disk = global_context->getDisk(disk_name);
String full_path = fullPathWithValidate(disk, path);
String relative_path = validatePathAndGetAsRelative(path);
bool recursive = config.getBool("recursive", false);
if (recursive)
disk->createDirectories(full_path);
disk->createDirectories(relative_path);
else
disk->createDirectory(full_path);
disk->createDirectory(relative_path);
}
};
}

View File

@ -1,5 +1,3 @@
#pragma once
#include "ICommand.h"
#include <Interpreters/Context.h>
@ -11,7 +9,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
class CommandMove : public ICommand
class CommandMove final : public ICommand
{
public:
CommandMove()
@ -39,18 +37,18 @@ public:
String disk_name = config.getString("disk", "default");
String path_from = command_arguments[0];
String path_to = command_arguments[1];
const String & path_from = command_arguments[0];
const String & path_to = command_arguments[1];
DiskPtr disk = global_context->getDisk(disk_name);
String full_path_from = fullPathWithValidate(disk, path_from);
String full_path_to = fullPathWithValidate(disk, path_to);
String relative_path_from = validatePathAndGetAsRelative(path_from);
String relative_path_to = validatePathAndGetAsRelative(path_to);
if (disk->isFile(full_path_from))
disk->moveFile(full_path_from, full_path_to);
if (disk->isFile(relative_path_from))
disk->moveFile(relative_path_from, relative_path_to);
else
disk->moveDirectory(full_path_from, full_path_to);
disk->moveDirectory(relative_path_from, relative_path_to);
}
};
}

View File

@ -1,7 +1,9 @@
#pragma once
#include "ICommand.h"
#include <Interpreters/Context.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
#include <Common/TerminalSize.h>
namespace DB
{
@ -11,7 +13,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
class CommandRead : public ICommand
class CommandRead final : public ICommand
{
public:
CommandRead()
@ -46,27 +48,25 @@ public:
String disk_name = config.getString("disk", "default");
String path = command_arguments[0];
DiskPtr disk = global_context->getDisk(disk_name);
String full_path = fullPathWithValidate(disk, path);
String relative_path = validatePathAndGetAsRelative(command_arguments[0]);
String path_output = config.getString("output", "");
if (!path_output.empty())
{
String full_path_output = fullPathWithValidate(disk, path_output);
String relative_path_output = validatePathAndGetAsRelative(path_output);
auto in = disk->readFile(full_path);
auto out = disk->writeFile(full_path_output);
auto in = disk->readFile(relative_path);
auto out = disk->writeFile(relative_path_output);
copyData(*in, *out);
out->finalize();
return;
}
else
{
auto in = disk->readFile(full_path);
auto in = disk->readFile(relative_path);
std::unique_ptr<WriteBufferFromFileBase> out = std::make_unique<WriteBufferFromFileDescriptor>(STDOUT_FILENO);
copyData(*in, *out);
}

View File

@ -1,5 +1,3 @@
#pragma once
#include "ICommand.h"
#include <Interpreters/Context.h>
@ -11,7 +9,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
class CommandRemove : public ICommand
class CommandRemove final : public ICommand
{
public:
CommandRemove()
@ -39,13 +37,13 @@ public:
String disk_name = config.getString("disk", "default");
String path = command_arguments[0];
const String & path = command_arguments[0];
DiskPtr disk = global_context->getDisk(disk_name);
String full_path = fullPathWithValidate(disk, path);
String relative_path = validatePathAndGetAsRelative(path);
disk->removeRecursive(full_path);
disk->removeRecursive(relative_path);
}
};
}

View File

@ -1,8 +1,11 @@
#pragma once
#include "ICommand.h"
#include <Interpreters/Context.h>
#include <Common/TerminalSize.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
namespace DB
{
@ -11,7 +14,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
class CommandWrite : public ICommand
class CommandWrite final : public ICommand
{
public:
CommandWrite()
@ -46,11 +49,11 @@ public:
String disk_name = config.getString("disk", "default");
String path = command_arguments[0];
const String & path = command_arguments[0];
DiskPtr disk = global_context->getDisk(disk_name);
String full_path = fullPathWithValidate(disk, path);
String relative_path = validatePathAndGetAsRelative(path);
String path_input = config.getString("input", "");
std::unique_ptr<ReadBufferFromFileBase> in;
@ -60,11 +63,11 @@ public:
}
else
{
String full_path_input = fullPathWithValidate(disk, path_input);
in = disk->readFile(full_path_input);
String relative_path_input = validatePathAndGetAsRelative(path_input);
in = disk->readFile(relative_path_input);
}
auto out = disk->writeFile(full_path);
auto out = disk->writeFile(relative_path);
copyData(*in, *out);
out->finalize();
}

View File

@ -1,11 +1,12 @@
#include "DisksApp.h"
#include "ICommand.h"
#include <Disks/registerDisks.h>
#include <base/argsToConfig.h>
#include <Common/TerminalSize.h>
#include <Formats/registerFormats.h>
namespace DB
{

View File

@ -1,28 +1,22 @@
#pragma once
#include "CommandCopy.cpp"
#include "CommandLink.cpp"
#include "CommandList.cpp"
#include "CommandListDisks.cpp"
#include "CommandMkDir.cpp"
#include "CommandMove.cpp"
#include "CommandRead.cpp"
#include "CommandRemove.cpp"
#include "CommandWrite.cpp"
#include <Loggers/Loggers.h>
#include <Common/ProgressIndication.h>
#include <Common/StatusFile.h>
#include <Common/InterruptListener.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Poco/Util/Application.h>
#include <boost/program_options.hpp>
namespace DB
{
class ICommand;
using CommandPtr = std::unique_ptr<ICommand>;
namespace po = boost::program_options;
using ProgramOptionsDescription = boost::program_options::options_description;
using CommandLineOptions = boost::program_options::variables_map;
class DisksApp : public Poco::Util::Application, public Loggers
{
public:

View File

@ -30,19 +30,21 @@ void ICommand::addOptions(ProgramOptionsDescription & options_description)
options_description.add(*command_option_description);
}
String ICommand::fullPathWithValidate(const DiskPtr & disk, const String & path)
String ICommand::validatePathAndGetAsRelative(const String & path)
{
if (fs::path(path).lexically_normal().string() != path)
/// If path contain non-normalized symbols like . we will normalized them. If the resulting normalized path
/// still contain '..' it can be dangerous, disallow such paths. Also since clickhouse-disks
/// is not an interactive program (don't track you current path) it's OK to disallow .. paths.
String lexically_normal_path = fs::path(path).lexically_normal();
if (lexically_normal_path.find("..") != std::string::npos)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Path {} is not normalized", path);
String disk_path = fs::canonical(fs::path(disk->getPath())) / "";
String full_path = (fs::absolute(disk_path) / path).lexically_normal();
/// If path is absolute we should keep it as relative inside disk, so disk will look like
/// an ordinary filesystem with root.
if (fs::path(lexically_normal_path).is_absolute())
return lexically_normal_path.substr(1);
if (!full_path.starts_with(disk_path))
throw DB::Exception(
DB::ErrorCodes::BAD_ARGUMENTS, "Path {} must be inside disk path {}", full_path, disk_path);
return path;
return lexically_normal_path;
}
}

View File

@ -2,16 +2,10 @@
#include <Disks/IDisk.h>
#include <Poco/Util/Application.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/copyData.h>
#include <boost/program_options.hpp>
#include <Common/TerminalSize.h>
#include <Common/Config/ConfigProcessor.h>
#include <Poco/Util/Application.h>
#include <memory>
@ -43,7 +37,7 @@ public:
protected:
void printHelpMessage() const;
static String fullPathWithValidate(const DiskPtr & disk, const String & path);
static String validatePathAndGetAsRelative(const String & path);
public:
String command_name;
@ -55,14 +49,16 @@ protected:
po::positional_options_description positional_options_description;
};
using CommandPtr = std::unique_ptr<ICommand>;
}
std::unique_ptr <DB::ICommand> makeCommandCopy();
std::unique_ptr <DB::ICommand> makeCommandLink();
std::unique_ptr <DB::ICommand> makeCommandList();
std::unique_ptr <DB::ICommand> makeCommandListDisks();
std::unique_ptr <DB::ICommand> makeCommandMove();
std::unique_ptr <DB::ICommand> makeCommandRead();
std::unique_ptr <DB::ICommand> makeCommandRemove();
std::unique_ptr <DB::ICommand> makeCommandWrite();
std::unique_ptr <DB::ICommand> makeCommandMkDir();
DB::CommandPtr makeCommandCopy();
DB::CommandPtr makeCommandLink();
DB::CommandPtr makeCommandList();
DB::CommandPtr makeCommandListDisks();
DB::CommandPtr makeCommandMove();
DB::CommandPtr makeCommandRead();
DB::CommandPtr makeCommandRemove();
DB::CommandPtr makeCommandWrite();
DB::CommandPtr makeCommandMkDir();

View File

@ -606,6 +606,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(UInt64, async_insert_max_query_number, 450, "Maximum number of insert queries before being inserted", 0) \
M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \
\
M(UInt64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \

View File

@ -18,7 +18,6 @@
#include <sys/stat.h>
#include <Disks/DiskFactory.h>
#include <Disks/DiskMemory.h>
#include <Disks/DiskRestartProxy.h>
#include <Common/randomSeed.h>
#include <IO/ReadHelpers.h>

View File

@ -1,481 +0,0 @@
#include "DiskMemory.h"
#include "DiskFactory.h"
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/Context.h>
#include <Disks/ObjectStorages/LocalObjectStorage.h>
#include <Disks/ObjectStorages/FakeMetadataStorageFromDisk.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int FILE_DOESNT_EXIST;
extern const int FILE_ALREADY_EXISTS;
extern const int DIRECTORY_DOESNT_EXIST;
extern const int CANNOT_DELETE_DIRECTORY;
}
class DiskMemoryDirectoryIterator final : public IDirectoryIterator
{
public:
explicit DiskMemoryDirectoryIterator(std::vector<fs::path> && dir_file_paths_)
: dir_file_paths(std::move(dir_file_paths_)), iter(dir_file_paths.begin())
{
}
void next() override { ++iter; }
bool isValid() const override { return iter != dir_file_paths.end(); }
String path() const override { return iter->string(); }
String name() const override { return iter->filename(); }
private:
std::vector<fs::path> dir_file_paths;
std::vector<fs::path>::iterator iter;
};
/// Adapter with actual behaviour as ReadBufferFromString.
class ReadIndirectBuffer final : public ReadBufferFromFileBase
{
public:
ReadIndirectBuffer(String path_, const String & data_)
: impl(ReadBufferFromString(data_)), path(std::move(path_))
{
internal_buffer = impl.buffer();
working_buffer = internal_buffer;
pos = working_buffer.begin();
}
std::string getFileName() const override { return path; }
off_t seek(off_t off, int whence) override
{
impl.swap(*this);
off_t result = impl.seek(off, whence);
impl.swap(*this);
return result;
}
off_t getPosition() override { return pos - working_buffer.begin(); }
private:
ReadBufferFromString impl;
const String path;
};
/// This class is responsible to update files metadata after buffer is finalized.
class WriteIndirectBuffer final : public WriteBufferFromFileBase
{
public:
WriteIndirectBuffer(DiskMemory * disk_, String path_, WriteMode mode_, size_t buf_size)
: WriteBufferFromFileBase(buf_size, nullptr, 0), disk(disk_), path(std::move(path_)), mode(mode_)
{
}
~WriteIndirectBuffer() override
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void finalizeImpl() override
{
if (impl.isFinished())
return;
next();
/// str() finalizes buffer.
String value = impl.str();
std::lock_guard lock(disk->mutex);
auto iter = disk->files.find(path);
if (iter == disk->files.end())
throw Exception("File '" + path + "' does not exist", ErrorCodes::FILE_DOESNT_EXIST);
/// Resize to the actual number of bytes written to string.
value.resize(count());
if (mode == WriteMode::Rewrite)
disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, value});
else if (mode == WriteMode::Append)
disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, iter->second.data + value});
}
std::string getFileName() const override { return path; }
void sync() override {}
private:
void nextImpl() override
{
if (!offset())
return;
impl.write(working_buffer.begin(), offset());
}
WriteBufferFromOwnString impl;
DiskMemory * disk;
const String path;
const WriteMode mode;
};
DiskMemory::DiskMemory(const String & name_)
: IDisk(name_)
, disk_path("memory(" + name_ + ')')
{}
ReservationPtr DiskMemory::reserve(UInt64 /*bytes*/)
{
throw Exception("Method reserve is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
UInt64 DiskMemory::getTotalSpace() const
{
return 0;
}
UInt64 DiskMemory::getAvailableSpace() const
{
return 0;
}
UInt64 DiskMemory::getUnreservedSpace() const
{
return 0;
}
bool DiskMemory::exists(const String & path) const
{
std::lock_guard lock(mutex);
return files.find(path) != files.end();
}
bool DiskMemory::isFile(const String & path) const
{
std::lock_guard lock(mutex);
auto iter = files.find(path);
if (iter == files.end())
return false;
return iter->second.type == FileType::File;
}
bool DiskMemory::isDirectory(const String & path) const
{
std::lock_guard lock(mutex);
auto iter = files.find(path);
if (iter == files.end())
return false;
return iter->second.type == FileType::Directory;
}
size_t DiskMemory::getFileSize(const String & path) const
{
std::lock_guard lock(mutex);
auto iter = files.find(path);
if (iter == files.end())
throw Exception("File '" + path + "' does not exist", ErrorCodes::FILE_DOESNT_EXIST);
return iter->second.data.length();
}
void DiskMemory::createDirectory(const String & path)
{
std::lock_guard lock(mutex);
if (files.find(path) != files.end())
return;
String parent_path = parentPath(path);
if (!parent_path.empty() && files.find(parent_path) == files.end())
throw Exception(
"Failed to create directory '" + path + "'. Parent directory " + parent_path + " does not exist",
ErrorCodes::DIRECTORY_DOESNT_EXIST);
files.emplace(path, FileData{FileType::Directory});
}
void DiskMemory::createDirectories(const String & path)
{
std::lock_guard lock(mutex);
createDirectoriesImpl(path);
}
void DiskMemory::createDirectoriesImpl(const String & path)
{
if (files.find(path) != files.end())
return;
String parent_path = parentPath(path);
if (!parent_path.empty())
createDirectoriesImpl(parent_path);
files.emplace(path, FileData{FileType::Directory});
}
void DiskMemory::clearDirectory(const String & path)
{
std::lock_guard lock(mutex);
if (files.find(path) == files.end())
throw Exception("Directory '" + path + "' does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
for (auto iter = files.begin(); iter != files.end();)
{
if (parentPath(iter->first) != path)
{
++iter;
continue;
}
if (iter->second.type == FileType::Directory)
throw Exception(
"Failed to clear directory '" + path + "'. " + iter->first + " is a directory", ErrorCodes::CANNOT_DELETE_DIRECTORY);
iter = files.erase(iter);
}
}
void DiskMemory::moveDirectory(const String & /*from_path*/, const String & /*to_path*/)
{
throw Exception("Method moveDirectory is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
DirectoryIteratorPtr DiskMemory::iterateDirectory(const String & path) const
{
std::lock_guard lock(mutex);
if (!path.empty() && files.find(path) == files.end())
throw Exception("Directory '" + path + "' does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
std::vector<fs::path> dir_file_paths;
for (const auto & file : files)
if (parentPath(file.first) == path)
dir_file_paths.emplace_back(file.first);
return std::make_unique<DiskMemoryDirectoryIterator>(std::move(dir_file_paths));
}
void DiskMemory::moveFile(const String & from_path, const String & to_path)
{
std::lock_guard lock(mutex);
if (files.find(to_path) != files.end())
throw Exception(
"Failed to move file from " + from_path + " to " + to_path + ". File " + to_path + " already exist",
ErrorCodes::FILE_ALREADY_EXISTS);
replaceFileImpl(from_path, to_path);
}
void DiskMemory::replaceFile(const String & from_path, const String & to_path)
{
std::lock_guard lock(mutex);
replaceFileImpl(from_path, to_path);
}
void DiskMemory::replaceFileImpl(const String & from_path, const String & to_path)
{
String to_parent_path = parentPath(to_path);
if (!to_parent_path.empty() && files.find(to_parent_path) == files.end())
throw Exception(
"Failed to move file from " + from_path + " to " + to_path + ". Directory " + to_parent_path + " does not exist",
ErrorCodes::DIRECTORY_DOESNT_EXIST);
auto iter = files.find(from_path);
if (iter == files.end())
throw Exception(
"Failed to move file from " + from_path + " to " + to_path + ". File " + from_path + " does not exist",
ErrorCodes::FILE_DOESNT_EXIST);
auto node = files.extract(iter);
node.key() = to_path;
files.insert(std::move(node));
}
std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path, const ReadSettings &, std::optional<size_t>, std::optional<size_t>) const
{
std::lock_guard lock(mutex);
auto iter = files.find(path);
if (iter == files.end())
throw Exception("File '" + path + "' does not exist", ErrorCodes::FILE_DOESNT_EXIST);
return std::make_unique<ReadIndirectBuffer>(path, iter->second.data);
}
std::unique_ptr<WriteBufferFromFileBase> DiskMemory::writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings &)
{
std::lock_guard lock(mutex);
auto iter = files.find(path);
if (iter == files.end())
{
String parent_path = parentPath(path);
if (!parent_path.empty() && files.find(parent_path) == files.end())
throw Exception(
"Failed to create file '" + path + "'. Directory " + parent_path + " does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
files.emplace(path, FileData{FileType::File});
}
return std::make_unique<WriteIndirectBuffer>(this, path, mode, buf_size);
}
void DiskMemory::removeFile(const String & path)
{
std::lock_guard lock(mutex);
auto file_it = files.find(path);
if (file_it == files.end())
throw Exception("File '" + path + "' doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
if (file_it->second.type == FileType::Directory)
throw Exception("Path '" + path + "' is a directory", ErrorCodes::CANNOT_DELETE_DIRECTORY);
else
files.erase(file_it);
}
void DiskMemory::removeDirectory(const String & path)
{
std::lock_guard lock(mutex);
auto file_it = files.find(path);
if (file_it == files.end())
throw Exception("File '" + path + "' doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
if (file_it->second.type == FileType::Directory)
{
files.erase(file_it);
if (std::any_of(files.begin(), files.end(), [path](const auto & file) { return parentPath(file.first) == path; }))
throw Exception("Directory '" + path + "' is not empty", ErrorCodes::CANNOT_DELETE_DIRECTORY);
}
else
{
throw Exception("Path '" + path + "' is not a directory", ErrorCodes::CANNOT_DELETE_DIRECTORY);
}
}
void DiskMemory::removeFileIfExists(const String & path)
{
std::lock_guard lock(mutex);
auto file_it = files.find(path);
if (file_it == files.end())
return;
if (file_it->second.type == FileType::Directory)
throw Exception("Path '" + path + "' is a directory", ErrorCodes::CANNOT_DELETE_DIRECTORY);
else
files.erase(file_it);
}
void DiskMemory::removeRecursive(const String & path)
{
std::lock_guard lock(mutex);
auto file_it = files.find(path);
if (file_it == files.end())
throw Exception("File '" + path + "' doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
for (auto iter = files.begin(); iter != files.end();)
{
if (iter->first.size() >= path.size() && std::string_view(iter->first.data(), path.size()) == path)
iter = files.erase(iter);
else
++iter;
}
}
void DiskMemory::listFiles(const String & path, std::vector<String> & file_names) const
{
std::lock_guard lock(mutex);
for (auto it = iterateDirectory(path); it->isValid(); it->next())
file_names.push_back(it->name());
}
void DiskMemory::createHardLink(const String &, const String &)
{
throw Exception("Method createHardLink is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
void DiskMemory::createFile(const String &)
{
throw Exception("Method createFile is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
void DiskMemory::setReadOnly(const String &)
{
throw Exception("Method setReadOnly is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
void DiskMemory::truncateFile(const String & path, size_t size)
{
std::lock_guard lock(mutex);
auto file_it = files.find(path);
if (file_it == files.end())
throw Exception("File '" + path + "' doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
file_it->second.data.resize(size);
}
MetadataStoragePtr DiskMemory::getMetadataStorage()
{
auto object_storage = std::make_shared<LocalObjectStorage>();
return std::make_shared<FakeMetadataStorageFromDisk>(
std::static_pointer_cast<IDisk>(shared_from_this()), object_storage, getPath());
}
using DiskMemoryPtr = std::shared_ptr<DiskMemory>;
void registerDiskMemory(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
DiskPtr disk = std::make_shared<DiskMemory>(name);
disk->startup(context, skip_access_check);
return disk;
};
factory.registerDiskType("memory", creator);
}
}

View File

@ -1,127 +0,0 @@
#pragma once
#include <memory>
#include <mutex>
#include <unordered_map>
#include <utility>
#include <Disks/IDisk.h>
namespace DB
{
class ReadBufferFromFileBase;
class WriteBufferFromFileBase;
/** Implementation of Disk intended only for testing purposes.
* All filesystem objects are stored in memory and lost on server restart.
*
* NOTE Work in progress. Currently the interface is not viable enough to support MergeTree or even StripeLog tables.
* Please delete this interface if it will not be finished after 2020-06-18.
*/
class DiskMemory : public IDisk
{
public:
explicit DiskMemory(const String & name_);
const String & getPath() const override { return disk_path; }
ReservationPtr reserve(UInt64 bytes) override;
UInt64 getTotalSpace() const override;
UInt64 getAvailableSpace() const override;
UInt64 getUnreservedSpace() const override;
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
bool isDirectory(const String & path) const override;
size_t getFileSize(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override;
DirectoryIteratorPtr iterateDirectory(const String & path) const override;
void createFile(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) const override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
size_t buf_size,
WriteMode mode,
const WriteSettings & settings) override;
void removeFile(const String & path) override;
void removeFileIfExists(const String & path) override;
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;
void setLastModified(const String &, const Poco::Timestamp &) override {}
Poco::Timestamp getLastModified(const String &) const override { return Poco::Timestamp(); }
time_t getLastChanged(const String &) const override { return {}; }
void setReadOnly(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
void truncateFile(const String & path, size_t size) override;
DataSourceDescription getDataSourceDescription() const override { return DataSourceDescription{DataSourceType::RAM, "", false, false}; }
bool isRemote() const override { return false; }
bool supportZeroCopyReplication() const override { return false; }
MetadataStoragePtr getMetadataStorage() override;
private:
void createDirectoriesImpl(const String & path);
void replaceFileImpl(const String & from_path, const String & to_path);
friend class WriteIndirectBuffer;
enum class FileType
{
File,
Directory
};
struct FileData
{
FileType type;
String data;
FileData(FileType type_, String data_) : type(type_), data(std::move(data_)) {}
explicit FileData(FileType type_) : type(type_) {}
};
using Files = std::unordered_map<String, FileData>; /// file path -> file data
const String disk_path;
Files files;
mutable std::mutex mutex;
};
}

View File

@ -463,6 +463,8 @@ inline String fullPath(const DiskPtr & disk, const String & path)
/// Return parent path for the specified path.
inline String parentPath(const String & path)
{
if (path == "/")
return "/";
if (path.ends_with('/'))
return fs::path(path).parent_path().parent_path() / "";
return fs::path(path).parent_path() / "";

View File

@ -8,7 +8,6 @@ namespace DB
{
void registerDiskLocal(DiskFactory & factory, bool global_skip_access_check);
void registerDiskMemory(DiskFactory & factory, bool global_skip_access_check);
#if USE_AWS_S3
void registerDiskS3(DiskFactory & factory, bool global_skip_access_check);
@ -35,7 +34,6 @@ void registerDisks(bool global_skip_access_check)
auto & factory = DiskFactory::instance();
registerDiskLocal(factory, global_skip_access_check);
registerDiskMemory(factory, global_skip_access_check);
#if USE_AWS_S3
registerDiskS3(factory, global_skip_access_check);

View File

@ -10,11 +10,6 @@ namespace fs = std::filesystem;
template <typename T>
DB::DiskPtr createDisk();
template <>
DB::DiskPtr createDisk<DB::DiskMemory>()
{
return std::make_shared<DB::DiskMemory>("memory_disk");
}
template <>
DB::DiskPtr createDisk<DB::DiskLocal>()
@ -30,11 +25,6 @@ void destroyDisk(DB::DiskPtr & disk)
disk.reset();
}
template <>
void destroyDisk<DB::DiskMemory>(DB::DiskPtr & disk)
{
disk.reset();
}
template <>
void destroyDisk<DB::DiskLocal>(DB::DiskPtr & disk)
@ -55,7 +45,7 @@ public:
};
using DiskImplementations = testing::Types<DB::DiskMemory, DB::DiskLocal>;
using DiskImplementations = testing::Types<DB::DiskLocal>;
TYPED_TEST_SUITE(DiskTest, DiskImplementations);

View File

@ -1,15 +1,11 @@
#pragma once
#include <Disks/DiskLocal.h>
#include <Disks/DiskMemory.h>
#include <Disks/IDisk.h>
template <typename T>
DB::DiskPtr createDisk();
template <>
DB::DiskPtr createDisk<DB::DiskMemory>();
template <>
DB::DiskPtr createDisk<DB::DiskLocal>();
@ -18,6 +14,3 @@ void destroyDisk(DB::DiskPtr & disk);
template <>
void destroyDisk<DB::DiskLocal>(DB::DiskPtr & disk);
template <>
void destroyDisk<DB::DiskMemory>(DB::DiskPtr & disk);

View File

@ -235,6 +235,7 @@ std::future<void> AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_c
assert(data);
data->size_in_bytes += entry_data_size;
++data->query_number;
data->entries.emplace_back(entry);
insert_future = entry->getFuture();
@ -244,7 +245,7 @@ std::future<void> AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_c
/// Here we check whether we hit the limit on maximum data size in the buffer.
/// And use setting from query context.
/// It works, because queries with the same set of settings are already grouped together.
if (data->size_in_bytes > key.settings.async_insert_max_data_size)
if (data->size_in_bytes > key.settings.async_insert_max_data_size || data->query_number > key.settings.async_insert_max_query_number)
{
data_to_process = std::move(data);
shard.iterators.erase(it);
@ -399,11 +400,13 @@ try
StreamingFormatExecutor executor(header, format, std::move(on_error), std::move(adding_defaults_transform));
std::unique_ptr<ReadBuffer> last_buffer;
auto chunk_info = std::make_shared<ChunkOffsets>();
for (const auto & entry : data->entries)
{
auto buffer = std::make_unique<ReadBufferFromString>(entry->bytes);
current_entry = entry;
total_rows += executor.execute(*buffer);
chunk_info->offsets.push_back(total_rows);
/// Keep buffer, because it still can be used
/// in destructor, while resetting buffer at next iteration.
@ -444,6 +447,7 @@ try
try
{
auto chunk = Chunk(executor.getResultColumns(), total_rows);
chunk.setChunkInfo(std::move(chunk_info));
size_t total_bytes = chunk.bytes();
auto source = std::make_shared<SourceFromSingleChunk>(header, std::move(chunk));

View File

@ -64,7 +64,9 @@ private:
using EntryPtr = std::shared_ptr<Entry>;
std::list<EntryPtr> entries;
size_t size_in_bytes = 0;
size_t query_number = 0;
};
using InsertDataPtr = std::unique_ptr<InsertData>;

View File

@ -285,7 +285,7 @@ Chain InterpreterInsertQuery::buildChainImpl(
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(settings.insert_distributed_sync && table->isRemote()) && !no_squash && !(query && query->watch))
if (!(settings.insert_distributed_sync && table->isRemote()) && !async_insert && !no_squash && !(query && query->watch))
{
bool table_prefers_large_blocks = table->prefersLargeBlocks();

View File

@ -113,6 +113,17 @@ private:
using Chunks = std::vector<Chunk>;
/// ChunkOffsets marks offsets of different sub-chunks, which will be used by async inserts.
class ChunkOffsets : public ChunkInfo
{
public:
ChunkOffsets() = default;
explicit ChunkOffsets(const std::vector<size_t> & offsets_) : offsets(offsets_) {}
std::vector<size_t> offsets;
};
using ChunkOffsetsPtr = std::shared_ptr<ChunkOffsets>;
/// Extension to support delayed defaults. AddingDefaultsProcessor uses it to replace missing values with column defaults.
class ChunkMissingValues : public ChunkInfo
{

View File

@ -39,6 +39,7 @@ void ConvertingTransform::onConsume(Chunk chunk)
expression->execute(block, num_rows);
chunk.setColumns(block.getColumns(), num_rows);
chunk.setChunkInfo(chunk.getChunkInfo());
cur_chunk = std::move(chunk);
}

View File

@ -13,16 +13,19 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
EphemeralLockInZooKeeper::EphemeralLockInZooKeeper(const String & path_prefix_, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & path_)
: zookeeper(zookeeper_), path_prefix(path_prefix_), path(path_)
EphemeralLockInZooKeeper::EphemeralLockInZooKeeper(const String & path_prefix_, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & path_, const String & conflict_path_)
: zookeeper(zookeeper_), path_prefix(path_prefix_), path(path_), conflict_path(conflict_path_)
{
if (path.size() <= path_prefix.size())
if (conflict_path.empty() && path.size() <= path_prefix.size())
throw Exception("Logical error: name of the main node is shorter than prefix.", ErrorCodes::LOGICAL_ERROR);
}
template <typename T>
std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path)
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const T & deduplication_path)
{
constexpr bool async_insert = std::is_same_v<T, std::vector<String>>;
String path;
if (deduplication_path.empty())
@ -36,14 +39,38 @@ std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
/// Check for duplicates in advance, to avoid superfluous block numbers allocation
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(deduplication_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(deduplication_path, -1));
if constexpr (async_insert)
{
for (const auto & single_dedup_path : deduplication_path)
{
ops.emplace_back(zkutil::makeCreateRequest(single_dedup_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(single_dedup_path, -1));
}
}
else
{
ops.emplace_back(zkutil::makeCreateRequest(deduplication_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(deduplication_path, -1));
}
ops.emplace_back(zkutil::makeCreateRequest(path_prefix_, holder_path, zkutil::CreateMode::EphemeralSequential));
Coordination::Responses responses;
Coordination::Error e = zookeeper_->tryMulti(ops, responses);
if (e != Coordination::Error::ZOK)
if (e == Coordination::Error::ZNODEEXISTS)
{
if (responses[0]->error == Coordination::Error::ZNODEEXISTS)
if constexpr (async_insert)
{
auto failed_idx = zkutil::getFailedOpIndex(Coordination::Error::ZNODEEXISTS, responses);
if (failed_idx < deduplication_path.size() * 2)
{
const String & failed_op_path = deduplication_path[failed_idx / 2];
LOG_DEBUG(
&Poco::Logger::get("createEphemeralLockInZooKeeper"),
"Deduplication path already exists: deduplication_path={}",
failed_op_path);
return EphemeralLockInZooKeeper{"", nullptr, "", failed_op_path};
}
}
else if (responses[0]->error == Coordination::Error::ZNODEEXISTS)
{
LOG_DEBUG(
&Poco::Logger::get("createEphemeralLockInZooKeeper"),
@ -51,11 +78,12 @@ std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
deduplication_path);
return {};
}
else
{
zkutil::KeeperMultiException::check(e, ops, responses); // This should always throw the proper exception
throw Exception("Unable to handle error {} when acquiring ephemeral lock in ZK", ErrorCodes::LOGICAL_ERROR);
}
}
if (e != Coordination::Error::ZOK)
{
zkutil::KeeperMultiException::check(e, ops, responses); // This should always throw the proper exception
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unable to handle error {} when acquiring ephemeral lock in ZK", toString(e));
}
path = dynamic_cast<const Coordination::CreateResponse *>(responses.back().get())->path_created;
@ -193,4 +221,10 @@ EphemeralLocksInAllPartitions::~EphemeralLocksInAllPartitions()
}
}
template std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper<String>(
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path);
template std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper<std::vector<String>>(
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const std::vector<String> & deduplication_path);
}

View File

@ -26,11 +26,12 @@ namespace ErrorCodes
/// Since 22.11 it creates single ephemeral node with `path_prefix` that references persistent fake "secondary node".
class EphemeralLockInZooKeeper : public boost::noncopyable
{
template<typename T>
friend std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path);
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const T & deduplication_path);
protected:
EphemeralLockInZooKeeper(const String & path_prefix_, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & path_);
EphemeralLockInZooKeeper(const String & path_prefix_, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & path_, const String & conflict_path_ = "");
public:
EphemeralLockInZooKeeper() = delete;
@ -51,6 +52,7 @@ public:
rhs.zookeeper = nullptr;
path_prefix = std::move(rhs.path_prefix);
path = std::move(rhs.path);
conflict_path = std::move(rhs.conflict_path);
return *this;
}
@ -65,6 +67,13 @@ public:
return path;
}
// In case of async inserts, we try to get locks for multiple inserts and need to know which insert is conflicted.
// That's why we need this function.
String getConflictPath() const
{
return conflict_path;
}
/// Parse the number at the end of the path.
UInt64 getNumber() const
{
@ -97,11 +106,12 @@ private:
ZooKeeperWithFaultInjectionPtr zookeeper;
String path_prefix;
String path;
String conflict_path;
};
template<typename T>
std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path);
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const T & deduplication_path);
/// Acquires block number locks in all partitions.
class EphemeralLocksInAllPartitions : public boost::noncopyable

View File

@ -146,8 +146,43 @@ void MergeTreeDataWriter::TemporaryPart::finalize()
stream.finalizer.finish();
}
std::vector<ChunkOffsetsPtr> scatterOffsetsBySelector(ChunkOffsetsPtr chunk_offsets, const IColumn::Selector & selector, size_t partition_num)
{
if (nullptr == chunk_offsets)
{
return {};
}
if (selector.empty())
{
return {chunk_offsets};
}
std::vector<ChunkOffsetsPtr> result(partition_num);
std::vector<Int64> last_row_for_partition(partition_num, -1);
size_t offset_idx = 0;
for (size_t i = 0; i < selector.size(); ++i)
{
++last_row_for_partition[selector[i]];
if (i + 1 == chunk_offsets->offsets[offset_idx])
{
for (size_t part_id = 0; part_id < last_row_for_partition.size(); ++part_id)
{
Int64 last_row = last_row_for_partition[part_id];
if (-1 == last_row)
continue;
size_t offset = static_cast<size_t>(last_row + 1);
if (result[part_id] == nullptr)
result[part_id] = std::make_shared<ChunkOffsets>();
if (result[part_id]->offsets.empty() || offset > *result[part_id]->offsets.rbegin())
result[part_id]->offsets.push_back(offset);
}
++offset_idx;
}
}
return result;
}
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, ChunkOffsetsPtr chunk_offsets)
{
BlocksWithPartition result;
if (!block || !block.rows())
@ -158,6 +193,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
if (!metadata_snapshot->hasPartitionKey()) /// Table is not partitioned.
{
result.emplace_back(Block(block), Row{});
result[0].offsets = chunk_offsets;
return result;
}
@ -174,6 +210,8 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
IColumn::Selector selector;
buildScatterSelector(partition_columns, partition_num_to_first_row, selector, max_parts);
auto chunk_offsets_with_partition = scatterOffsetsBySelector(chunk_offsets, selector, partition_num_to_first_row.size());
size_t partitions_count = partition_num_to_first_row.size();
result.reserve(partitions_count);
@ -191,6 +229,8 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
/// NOTE: returning a copy of the original block so that calculated partition key columns
/// do not interfere with possible calculated primary key columns of the same name.
result.emplace_back(Block(block), get_partition(0));
if (!chunk_offsets_with_partition.empty())
result[0].offsets = chunk_offsets_with_partition[0];
return result;
}
@ -204,6 +244,9 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
result[i].block.getByPosition(col).column = std::move(scattered[i]);
}
for (size_t i = 0; i < chunk_offsets_with_partition.size(); ++i)
result[i].offsets = chunk_offsets_with_partition[i];
return result;
}

View File

@ -9,6 +9,8 @@
#include <Interpreters/sortBlock.h>
#include <Processors/Chunk.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
@ -20,11 +22,17 @@ struct BlockWithPartition
{
Block block;
Row partition;
ChunkOffsetsPtr offsets;
BlockWithPartition(Block && block_, Row && partition_)
: block(block_), partition(std::move(partition_))
{
}
BlockWithPartition(Block && block_, Row && partition_, ChunkOffsetsPtr chunk_offsets_)
: block(block_), partition(std::move(partition_)), offsets(chunk_offsets_)
{
}
};
using BlocksWithPartition = std::vector<BlockWithPartition>;
@ -43,7 +51,7 @@ public:
* (split rows by partition)
* Works deterministically: if same block was passed, function will return same result in same order.
*/
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, ChunkOffsetsPtr chunk_offsets = nullptr);
/// This structure contains not completely written temporary part.
/// Some writes may happen asynchronously, e.g. for blob storages.

View File

@ -79,6 +79,8 @@ struct Settings;
/** Replication settings. */ \
M(UInt64, replicated_deduplication_window, 100, "How many last blocks of hashes should be kept in ZooKeeper (old blocks will be deleted).", 0) \
M(UInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(UInt64, replicated_deduplication_window_for_async_inserts, 10000, "How many last hash values of async_insert blocks should be kept in ZooKeeper (old blocks will be deleted).", 0) \
M(UInt64, replicated_deduplication_window_seconds_for_async_inserts, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window_for_async_inserts\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(UInt64, max_replicated_logs_to_keep, 1000, "How many records may be in log, if there is inactive replica. Inactive replica becomes lost when when this number exceed.", 0) \
M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \
M(Seconds, prefer_fetch_merged_part_time_threshold, 3600, "If time passed after replication log entry creation exceeds this threshold and sum size of parts is greater than \"prefer_fetch_merged_part_size_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \

View File

@ -1483,7 +1483,16 @@ bool MutateTask::execute()
if (task->executeStep())
return true;
promise.set_value(ctx->new_data_part);
// The `new_data_part` is a shared pointer and must be moved to allow
// part deletion in case it is needed in `MutateFromLogEntryTask::finalize`.
//
// `tryRemovePartImmediately` requires `std::shared_ptr::unique() == true`
// to delete the part timely. When there are multiple shared pointers,
// only the part state is changed to `Deleting`.
//
// Fetching a byte-identical part (in case of checksum mismatches) will fail with
// `Part ... should be deleted after previous attempt before fetch`.
promise.set_value(std::move(ctx->new_data_part));
return false;
}
}

View File

@ -74,7 +74,9 @@ void ReplicatedMergeTreeCleanupThread::iterate()
if (storage.is_leader)
{
clearOldLogs();
clearOldBlocks();
auto storage_settings = storage.getSettings();
clearOldBlocks("blocks", storage_settings->replicated_deduplication_window_seconds, storage_settings->replicated_deduplication_window);
clearOldBlocks("async_blocks", storage_settings->replicated_deduplication_window_seconds_for_async_inserts, storage_settings->replicated_deduplication_window_for_async_inserts);
clearOldMutations();
storage.clearEmptyParts();
}
@ -321,10 +323,9 @@ struct ReplicatedMergeTreeCleanupThread::NodeWithStat
}
};
void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size)
{
auto zookeeper = storage.getZooKeeper();
auto storage_settings = storage.getSettings();
std::vector<NodeWithStat> timed_blocks;
getBlocksSortedByTime(*zookeeper, timed_blocks);
@ -336,12 +337,12 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
Int64 current_time = timed_blocks.front().ctime;
Int64 time_threshold = std::max(
static_cast<Int64>(0),
current_time - static_cast<Int64>(1000 * storage_settings->replicated_deduplication_window_seconds));
current_time - static_cast<Int64>(1000 * window_seconds));
/// Virtual node, all nodes that are "greater" than this one will be deleted
NodeWithStat block_threshold{{}, time_threshold, 0};
size_t current_deduplication_window = std::min<size_t>(timed_blocks.size(), storage_settings->replicated_deduplication_window);
size_t current_deduplication_window = std::min<size_t>(timed_blocks.size(), window_size);
auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window;
auto first_outdated_block_time_threshold = std::upper_bound(
timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
@ -359,7 +360,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
zkutil::AsyncResponses<Coordination::RemoveResponse> try_remove_futures;
for (auto it = first_outdated_block; it != timed_blocks.end(); ++it)
{
String path = storage.zookeeper_path + "/blocks/" + it->node;
String path = storage.zookeeper_path + "/" + blocks_dir_name + "/" + it->node;
try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path, it->version));
}

View File

@ -53,7 +53,7 @@ private:
size_t replicas_count, const zkutil::ZooKeeperPtr & zookeeper);
/// Remove old block hashes from ZooKeeper. This is done by the leader replica.
void clearOldBlocks();
void clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size);
/// Remove old mutations that are done from ZooKeeper. This is done by the leader replica.
void clearOldMutations();

View File

@ -8,6 +8,7 @@
#include <DataTypes/ObjectUtils.h>
#include <Core/Block.h>
#include <IO/Operators.h>
#include <fmt/core.h>
namespace ProfileEvents
{
@ -35,13 +36,39 @@ namespace ErrorCodes
extern const int QUERY_WAS_CANCELLED;
}
struct ReplicatedMergeTreeSink::DelayedChunk
template<bool async_insert>
struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
{
struct Partition
{
MergeTreeDataWriter::TemporaryPart temp_part;
UInt64 elapsed_ns;
String block_id;
BlockIDsType block_id;
BlockWithPartition block_with_partition;
std::unordered_map<String, size_t> block_id_to_offset_idx;
Partition() = default;
Partition(MergeTreeDataWriter::TemporaryPart && temp_part_, UInt64 elapsed_ns_, BlockIDsType && block_id_, BlockWithPartition && block_)
: temp_part(std::move(temp_part_)),
elapsed_ns(elapsed_ns_),
block_id(std::move(block_id_)),
block_with_partition(std::move(block_))
{
initBlockIDMap();
}
void initBlockIDMap()
{
if constexpr (async_insert)
{
block_id_to_offset_idx.clear();
for (size_t i = 0; i < block_id.size(); ++i)
{
block_id_to_offset_idx[block_id[i]] = i;
}
}
}
};
DelayedChunk() = default;
@ -52,7 +79,109 @@ struct ReplicatedMergeTreeSink::DelayedChunk
std::vector<Partition> partitions;
};
ReplicatedMergeTreeSink::ReplicatedMergeTreeSink(
namespace
{
/// Convert block id vector to string. Output at most 50 ids.
template<typename T>
inline String toString(const std::vector<T> & vec)
{
size_t size = vec.size();
if (size > 50) size = 50;
return fmt::format("({})", fmt::join(vec.begin(), vec.begin() + size, ","));
}
/// remove the conflict parts of block for rewriting again.
void rewriteBlock(Poco::Logger * log, typename ReplicatedMergeTreeSinkImpl<true>::DelayedChunk::Partition & partition, const std::vector<String> & block_paths)
{
std::vector<size_t> offset_idx;
for (const auto & raw_path : block_paths)
{
std::filesystem::path p(raw_path);
String conflict_block_id = p.filename();
auto it = partition.block_id_to_offset_idx.find(conflict_block_id);
if (it == partition.block_id_to_offset_idx.end())
throw Exception("Unknown conflict path " + conflict_block_id, ErrorCodes::LOGICAL_ERROR);
offset_idx.push_back(it->second);
}
std::sort(offset_idx.begin(), offset_idx.end());
auto & offsets = partition.block_with_partition.offsets->offsets;
size_t idx = 0, remove_count = 0;
auto it = offset_idx.begin();
std::vector<size_t> new_offsets;
std::vector<String> new_block_ids;
/// construct filter
size_t rows = partition.block_with_partition.block.rows();
auto filter_col = ColumnUInt8::create(rows, 1u);
ColumnUInt8::Container & vec = filter_col->getData();
UInt8 * pos = vec.data();
for (auto & offset : offsets)
{
if (it != offset_idx.end() && *it == idx)
{
size_t start_pos = idx > 0 ? offsets[idx - 1] : 0;
size_t end_pos = offset;
remove_count += end_pos - start_pos;
while (start_pos < end_pos)
{
*(pos + start_pos) = 0;
start_pos ++;
}
it++;
}
else
{
new_offsets.push_back(offset - remove_count);
new_block_ids.push_back(partition.block_id[idx]);
}
idx++;
}
LOG_TRACE(log, "New block IDs: {}, new offsets: {}, size: {}", toString(new_block_ids), toString(new_offsets), new_offsets.size());
offsets = std::move(new_offsets);
partition.block_id = std::move(new_block_ids);
auto cols = partition.block_with_partition.block.getColumns();
for (auto & col : cols)
{
col = col->filter(vec, rows - remove_count);
}
partition.block_with_partition.block.setColumns(cols);
LOG_TRACE(log, "New block rows {}", partition.block_with_partition.block.rows());
partition.initBlockIDMap();
}
std::vector<String> getHashesForBlocks(BlockWithPartition & block, String partition_id)
{
size_t start = 0;
auto cols = block.block.getColumns();
std::vector<String> block_id_vec;
for (auto offset : block.offsets->offsets)
{
SipHash hash;
for (size_t i = start; i < offset; ++i)
for (const auto & col : cols)
col->updateHashWithValue(i, hash);
union
{
char bytes[16];
UInt64 words[2];
} hash_value;
hash.get128(hash_value.bytes);
block_id_vec.push_back(partition_id + "_" + DB::toString(hash_value.words[0]) + "_" + DB::toString(hash_value.words[1]));
start = offset;
}
return block_id_vec;
}
}
template<bool async_insert>
ReplicatedMergeTreeSinkImpl<async_insert>::ReplicatedMergeTreeSinkImpl(
StorageReplicatedMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot_,
size_t quorum_size,
@ -81,7 +210,8 @@ ReplicatedMergeTreeSink::ReplicatedMergeTreeSink(
required_quorum_size = 0;
}
ReplicatedMergeTreeSink::~ReplicatedMergeTreeSink() = default;
template<bool async_insert>
ReplicatedMergeTreeSinkImpl<async_insert>::~ReplicatedMergeTreeSinkImpl() = default;
/// Allow to verify that the session in ZooKeeper is still alive.
static void assertSessionIsNotExpired(const zkutil::ZooKeeperPtr & zookeeper)
@ -93,7 +223,8 @@ static void assertSessionIsNotExpired(const zkutil::ZooKeeperPtr & zookeeper)
throw Exception("ZooKeeper session has been expired.", ErrorCodes::NO_ZOOKEEPER);
}
size_t ReplicatedMergeTreeSink::checkQuorumPrecondition(const ZooKeeperWithFaultInjectionPtr & zookeeper)
template<bool async_insert>
size_t ReplicatedMergeTreeSinkImpl<async_insert>::checkQuorumPrecondition(const ZooKeeperWithFaultInjectionPtr & zookeeper)
{
if (!isQuorumEnabled())
return 0;
@ -161,7 +292,8 @@ size_t ReplicatedMergeTreeSink::checkQuorumPrecondition(const ZooKeeperWithFault
return replicas_number;
}
void ReplicatedMergeTreeSink::consume(Chunk chunk)
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
{
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
@ -197,9 +329,22 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
if (!storage_snapshot->object_columns.empty())
convertDynamicColumnsToTuples(block, storage_snapshot);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
using DelayedPartitions = std::vector<ReplicatedMergeTreeSink::DelayedChunk::Partition>;
ChunkOffsetsPtr chunk_offsets;
if constexpr (async_insert)
{
const auto & chunk_info = chunk.getChunkInfo();
if (const auto * chunk_offsets_ptr = typeid_cast<const ChunkOffsets *>(chunk_info.get()))
chunk_offsets = std::make_shared<ChunkOffsets>(chunk_offsets_ptr->offsets);
else
throw Exception("No chunk info for async inserts", ErrorCodes::LOGICAL_ERROR);
}
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context, chunk_offsets);
using DelayedPartition = typename ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk::Partition;
using DelayedPartitions = std::vector<DelayedPartition>;
DelayedPartitions partitions;
size_t streams = 0;
@ -218,9 +363,15 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
if (!temp_part.part)
continue;
String block_id;
BlockIDsType block_id;
if (deduplicate)
if constexpr (async_insert)
{
/// TODO consider insert_deduplication_token
block_id = getHashesForBlocks(current_block, temp_part.part->info.partition_id);
LOG_TRACE(log, "async insert part, part id {}, block id {}, offsets {}, size {}", temp_part.part->info.partition_id, toString(block_id), toString(current_block.offsets->offsets), current_block.offsets->offsets.size());
}
else if (deduplicate)
{
String block_dedup_token;
@ -255,7 +406,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
if (streams > max_insert_delayed_streams_for_parallel_write)
{
finishDelayedChunk(zookeeper);
delayed_chunk = std::make_unique<ReplicatedMergeTreeSink::DelayedChunk>(replicas_num);
delayed_chunk = std::make_unique<ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk>(replicas_num);
delayed_chunk->partitions = std::move(partitions);
finishDelayedChunk(zookeeper);
@ -264,15 +415,16 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
partitions = DelayedPartitions{};
}
partitions.emplace_back(ReplicatedMergeTreeSink::DelayedChunk::Partition{
.temp_part = std::move(temp_part),
.elapsed_ns = elapsed_ns,
.block_id = std::move(block_id)
});
partitions.emplace_back(DelayedPartition(
std::move(temp_part),
elapsed_ns,
std::move(block_id),
std::move(current_block)
));
}
finishDelayedChunk(zookeeper);
delayed_chunk = std::make_unique<ReplicatedMergeTreeSink::DelayedChunk>();
delayed_chunk = std::make_unique<ReplicatedMergeTreeSinkImpl::DelayedChunk>();
delayed_chunk->partitions = std::move(partitions);
/// If deduplicated data should not be inserted into MV, we need to set proper
@ -283,7 +435,8 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
finishDelayedChunk(zookeeper);
}
void ReplicatedMergeTreeSink::finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper)
template<>
void ReplicatedMergeTreeSinkImpl<false>::finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper)
{
if (!delayed_chunk)
return;
@ -317,7 +470,36 @@ void ReplicatedMergeTreeSink::finishDelayedChunk(const ZooKeeperWithFaultInjecti
delayed_chunk.reset();
}
void ReplicatedMergeTreeSink::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
template<>
void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper)
{
if (!delayed_chunk)
return;
for (auto & partition: delayed_chunk->partitions)
{
int retry_times = 0;
while (true)
{
partition.temp_part.finalize();
auto conflict_block_ids = commitPart(zookeeper, partition.temp_part.part, partition.block_id, delayed_chunk->replicas_num, false);
if (conflict_block_ids.empty())
break;
LOG_DEBUG(log, "Found depulicate block IDs: {}, retry times {}", toString(conflict_block_ids), ++retry_times);
/// partition clean conflict
rewriteBlock(log, partition, conflict_block_ids);
if (partition.block_id.empty())
break;
partition.block_with_partition.partition = std::move(partition.temp_part.part->partition.value);
partition.temp_part = storage.writer.writeTempPart(partition.block_with_partition, metadata_snapshot, context);
}
}
delayed_chunk.reset();
}
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
{
/// NOTE: No delay in this case. That's Ok.
@ -332,7 +514,7 @@ void ReplicatedMergeTreeSink::writeExistingPart(MergeTreeData::MutableDataPartPt
try
{
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
commitPart(zookeeper, part, "", replicas_num, true);
commitPart(zookeeper, part, BlockIDsType(), replicas_num, true);
PartLog::addNewPart(storage.getContext(), part, watch.elapsed());
}
catch (...)
@ -342,10 +524,11 @@ void ReplicatedMergeTreeSink::writeExistingPart(MergeTreeData::MutableDataPartPt
}
}
void ReplicatedMergeTreeSink::commitPart(
template<bool async_insert>
std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
const ZooKeeperWithFaultInjectionPtr & zookeeper,
MergeTreeData::MutableDataPartPtr & part,
const String & block_id,
const BlockIDsType & block_id,
size_t replicas_num,
bool writing_existing_part)
{
@ -367,6 +550,7 @@ void ReplicatedMergeTreeSink::commitPart(
/// for retries due to keeper error
bool part_committed_locally_but_zookeeper = false;
Coordination::Error write_part_info_keeper_error = Coordination::Error::ZOK;
std::vector<String> conflict_block_ids;
ZooKeeperRetriesControl retries_ctl("commitPart", zookeeper_retries_info);
retries_ctl.retryLoop([&]()
@ -422,8 +606,15 @@ void ReplicatedMergeTreeSink::commitPart(
/// Also, make deduplication check. If a duplicate is detected, no nodes are created.
/// Allocate new block number and check for duplicates
const bool deduplicate_block = !block_id.empty();
String block_id_path = deduplicate_block ? storage.zookeeper_path + "/blocks/" + block_id : "";
bool deduplicate_block = !block_id.empty();
BlockIDsType block_id_path ;
if constexpr (async_insert)
{
for (const auto & single_block_id : block_id)
block_id_path.push_back(storage.zookeeper_path + "/async_blocks/" + single_block_id);
}
else if (deduplicate_block)
block_id_path = storage.zookeeper_path + "/blocks/" + block_id;
auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path);
ThreadFuzzer::maybeInjectSleep();
@ -436,6 +627,20 @@ void ReplicatedMergeTreeSink::commitPart(
String existing_part_name;
if (block_number_lock)
{
if constexpr (async_insert)
{
/// The truth is that we always get only one path from block_number_lock.
/// This is a restriction of Keeper. Here I would like to use vector because
/// I wanna keep extensibility for future optimization, for instance, using
/// cache to resolve conflicts in advance.
String conflict_path = block_number_lock->getConflictPath();
if (!conflict_path.empty())
{
LOG_TRACE(log, "Cannot get lock, the conflict path is {}", conflict_path);
conflict_block_ids.push_back(conflict_path);
return;
}
}
is_already_existing_part = false;
block_number = block_number_lock->getNumber();
@ -467,7 +672,8 @@ void ReplicatedMergeTreeSink::commitPart(
log_entry.new_part_name = part->name;
/// TODO maybe add UUID here as well?
log_entry.quorum = getQuorumSize(replicas_num);
log_entry.block_id = block_id;
if constexpr (!async_insert)
log_entry.block_id = block_id;
log_entry.new_part_type = part->getType();
ops.emplace_back(zkutil::makeCreateRequest(
@ -523,7 +729,8 @@ void ReplicatedMergeTreeSink::commitPart(
quorum_info.host_node_version));
}
}
else
/// async_insert will never return null lock, because they need the conflict path.
else if constexpr (!async_insert)
{
is_already_existing_part = true;
@ -577,6 +784,8 @@ void ReplicatedMergeTreeSink::commitPart(
/// Do not check for duplicate on commit to ZK.
block_id_path.clear();
}
else
throw Exception("Conflict block ids and block number lock should not be empty at the same time for async inserts", ErrorCodes::LOGICAL_ERROR);
/// Information about the part.
storage.getCommitPartOps(ops, part, block_id_path);
@ -675,11 +884,24 @@ void ReplicatedMergeTreeSink::commitPart(
{
String failed_op_path = zkutil::KeeperMultiException(multi_code, ops, responses).getPathForFirstFailedOp();
if (multi_code == Coordination::Error::ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path)
auto contains = [](const auto & block_ids, const String & path)
{
if constexpr (async_insert)
{
for (const auto & local_block_id : block_ids)
if (local_block_id == path)
return true;
return false;
}
else
return block_ids == path;
};
if (multi_code == Coordination::Error::ZNODEEXISTS && deduplicate_block && contains(block_id_path, failed_op_path))
{
/// Block with the same id have just appeared in table (or other replica), rollback thee insertion.
LOG_INFO(log, "Block with ID {} already exists (it was just appeared). Renaming part {} back to {}. Will retry write.",
block_id, part->name, temporary_part_relative_path);
toString(block_id), part->name, temporary_part_relative_path);
/// We will try to add this part again on the new iteration as it's just a new part.
/// So remove it from storage parts set immediately and transfer state to temporary.
@ -688,6 +910,13 @@ void ReplicatedMergeTreeSink::commitPart(
part->is_temp = true;
part->renameTo(temporary_part_relative_path, false);
if constexpr (async_insert)
{
conflict_block_ids = std::vector<String>({failed_op_path});
LOG_TRACE(log, "conflict when committing, the conflict block ids are {}", toString(conflict_block_ids));
return;
}
/// If this part appeared on other replica than it's better to try to write it locally one more time. If it's our part
/// than it will be ignored on the next iteration.
++loop_counter;
@ -714,7 +943,7 @@ void ReplicatedMergeTreeSink::commitPart(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Unexpected logical error while adding block {} with ID '{}': {}, path {}",
block_number,
block_id,
toString(block_id),
Coordination::errorMessage(multi_code),
failed_op_path);
}
@ -727,12 +956,13 @@ void ReplicatedMergeTreeSink::commitPart(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Unexpected ZooKeeper error while adding block {} with ID '{}': {}",
block_number,
block_id,
toString(block_id),
Coordination::errorMessage(multi_code));
}
},
[&zookeeper]() { zookeeper->cleanupEphemeralNodes(); });
if (!conflict_block_ids.empty())
return conflict_block_ids;
if (isQuorumEnabled())
{
ZooKeeperRetriesControl quorum_retries_ctl("waitForQuorum", zookeeper_retries_info);
@ -756,23 +986,27 @@ void ReplicatedMergeTreeSink::commitPart(
return;
});
}
return {};
}
void ReplicatedMergeTreeSink::onStart()
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::onStart()
{
/// Only check "too many parts" before write,
/// because interrupting long-running INSERT query in the middle is not convenient for users.
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event, context);
}
void ReplicatedMergeTreeSink::onFinish()
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::onFinish()
{
auto zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(zookeeper);
finishDelayedChunk(std::make_shared<ZooKeeperWithFaultInjection>(zookeeper));
}
void ReplicatedMergeTreeSink::waitForQuorum(
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::waitForQuorum(
const ZooKeeperWithFaultInjectionPtr & zookeeper,
const std::string & part_name,
const std::string & quorum_path,
@ -826,14 +1060,16 @@ void ReplicatedMergeTreeSink::waitForQuorum(
LOG_TRACE(log, "Quorum '{}' for part {} satisfied", quorum_path, part_name);
}
String ReplicatedMergeTreeSink::quorumLogMessage(size_t replicas_num) const
template<bool async_insert>
String ReplicatedMergeTreeSinkImpl<async_insert>::quorumLogMessage(size_t replicas_num) const
{
if (!isQuorumEnabled())
return "";
return fmt::format(" (quorum {} of {} replicas)", getQuorumSize(replicas_num), replicas_num);
}
size_t ReplicatedMergeTreeSink::getQuorumSize(size_t replicas_num) const
template<bool async_insert>
size_t ReplicatedMergeTreeSinkImpl<async_insert>::getQuorumSize(size_t replicas_num) const
{
if (!isQuorumEnabled())
return 0;
@ -844,9 +1080,13 @@ size_t ReplicatedMergeTreeSink::getQuorumSize(size_t replicas_num) const
return replicas_num / 2 + 1;
}
bool ReplicatedMergeTreeSink::isQuorumEnabled() const
template<bool async_insert>
bool ReplicatedMergeTreeSinkImpl<async_insert>::isQuorumEnabled() const
{
return !required_quorum_size.has_value() || required_quorum_size.value() > 1;
}
template class ReplicatedMergeTreeSinkImpl<true>;
template class ReplicatedMergeTreeSinkImpl<false>;
}

View File

@ -23,10 +23,16 @@ struct StorageSnapshot;
using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;
class ReplicatedMergeTreeSink : public SinkToStorage
/// ReplicatedMergeTreeSink will sink data to replicated merge tree with deduplication.
/// The template argument "async_insert" indicates whether this sink serves for async inserts.
/// Async inserts will have different deduplication policy. We use a vector of "block ids" to
/// identify different async inserts inside the same part. It will remove the duplicate inserts
/// when it encounters lock and retries.
template<bool async_insert>
class ReplicatedMergeTreeSinkImpl : public SinkToStorage
{
public:
ReplicatedMergeTreeSink(
ReplicatedMergeTreeSinkImpl(
StorageReplicatedMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot_,
size_t quorum_,
@ -40,7 +46,7 @@ public:
// needed to set the special LogEntryType::ATTACH_PART
bool is_attach_ = false);
~ReplicatedMergeTreeSink() override;
~ReplicatedMergeTreeSinkImpl() override;
void onStart() override;
void consume(Chunk chunk) override;
@ -61,7 +67,10 @@ public:
return last_block_is_duplicate;
}
struct DelayedChunk;
private:
using BlockIDsType = std::conditional_t<async_insert, std::vector<String>, String>;
ZooKeeperRetriesInfo zookeeper_retries_info;
struct QuorumInfo
{
@ -77,10 +86,10 @@ private:
size_t checkQuorumPrecondition(const ZooKeeperWithFaultInjectionPtr & zookeeper);
/// Rename temporary part and commit to ZooKeeper.
void commitPart(
std::vector<String> commitPart(
const ZooKeeperWithFaultInjectionPtr & zookeeper,
MergeTreeData::MutableDataPartPtr & part,
const String & block_id,
const BlockIDsType & block_id,
size_t replicas_num,
bool writing_existing_part);
@ -120,10 +129,12 @@ private:
UInt64 chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token
/// We can delay processing for previous chunk and start writing a new one.
struct DelayedChunk;
std::unique_ptr<DelayedChunk> delayed_chunk;
void finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper);
};
using ReplicatedMergeTreeSinkWithAsyncDeduplicate = ReplicatedMergeTreeSinkImpl<true>;
using ReplicatedMergeTreeSink = ReplicatedMergeTreeSinkImpl<false>;
}

View File

@ -0,0 +1,45 @@
#include "config.h"
#include <gtest/gtest.h>
#include <Processors/Chunk.h>
#include <Columns/IColumn.h>
#include <Common/PODArray.h>
namespace DB {
std::vector<ChunkOffsetsPtr> scatterOffsetsBySelector(ChunkOffsetsPtr chunk_offsets, const IColumn::Selector & selector, size_t partition_num);
class AsyncInsertsTest : public ::testing::TestPartResult
{};
TEST(AsyncInsertsTest, testScatterOffsetsBySelector)
{
auto test_impl = [](std::vector<size_t> offsets, std::vector<size_t> selector_data, size_t part_num, std::vector<std::vector<size_t>> expected)
{
auto offset_ptr = std::make_shared<ChunkOffsets>(offsets);
IColumn::Selector selector(selector_data.size());
size_t num_rows = selector_data.size();
for (size_t i = 0; i < num_rows; i++)
selector[i] = selector_data[i];
auto results = scatterOffsetsBySelector(offset_ptr, selector, part_num);
ASSERT_EQ(results.size(), expected.size());
for (size_t i = 0; i < results.size(); i++)
{
auto result = results[i]->offsets;
auto expect = expected[i];
ASSERT_EQ(result.size(), expect.size());
for (size_t j = 0; j < result.size(); j++)
ASSERT_EQ(result[j], expect[j]);
}
};
test_impl({5}, {0,1,0,1,0}, 2, {{3},{2}});
test_impl({5,10}, {0,1,0,1,0,1,0,1,0,1}, 2, {{3,5},{2,5}});
test_impl({4,8,12}, {0,1,0,1,0,2,0,2,1,2,1,2}, 3, {{2,4},{2,4},{2,4}});
test_impl({1,2,3,4,5}, {0,1,2,3,4}, 5, {{1},{1},{1},{1},{1}});
test_impl({3,6,10}, {1,1,1,2,2,2,0,0,0,0}, 3, {{4},{3},{3}});
}
}

View File

@ -701,6 +701,8 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/blocks", "",
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/async_blocks", "",
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/block_numbers", "",
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/nonincrement_block_numbers", "",
@ -1011,7 +1013,7 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper
/// NOTE /block_numbers/ actually is not flat, because /block_numbers/<partition_id>/ may have ephemeral children,
/// but we assume that all ephemeral block locks are already removed when table is being dropped.
static constexpr std::array flat_nodes = {"block_numbers", "blocks", "leader_election", "log", "mutations", "pinned_part_uuids"};
static constexpr std::array flat_nodes = {"block_numbers", "blocks", "async_blocks", "leader_election", "log", "mutations", "pinned_part_uuids"};
/// First try to remove paths that are known to be flat
for (const auto * node : flat_nodes)
@ -4521,6 +4523,16 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con
const auto storage_settings_ptr = getSettings();
const Settings & query_settings = local_context->getSettingsRef();
bool deduplicate = storage_settings_ptr->replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
bool async_deduplicate = query_settings.async_insert && storage_settings_ptr->replicated_deduplication_window_for_async_inserts != 0 && query_settings.insert_deduplicate;
if (async_deduplicate)
return std::make_shared<ReplicatedMergeTreeSinkWithAsyncDeduplicate>(
*this, metadata_snapshot, query_settings.insert_quorum.valueOr(0),
query_settings.insert_quorum_timeout.totalMilliseconds(),
query_settings.max_partitions_per_insert_block,
query_settings.insert_quorum_parallel,
deduplicate,
query_settings.insert_quorum.is_auto,
local_context);
// TODO: should we also somehow pass list of columns to deduplicate on to the ReplicatedMergeTreeSink?
return std::make_shared<ReplicatedMergeTreeSink>(
@ -5342,7 +5354,6 @@ bool StorageReplicatedMergeTree::existsNodeCached(const ZooKeeperWithFaultInject
return res;
}
std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::allocateBlockNumber(
const String & partition_id,
const zkutil::ZooKeeperPtr & zookeeper,
@ -5353,11 +5364,11 @@ std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::allocateBloc
partition_id, std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), zookeeper_block_id_path, zookeeper_path_prefix);
}
template<typename T>
std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::allocateBlockNumber(
const String & partition_id,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
const String & zookeeper_block_id_path,
const T & zookeeper_block_id_path,
const String & zookeeper_path_prefix) const
{
String zookeeper_table_path;
@ -6529,17 +6540,24 @@ void StorageReplicatedMergeTree::clearLockedBlockNumbersInPartition(
void StorageReplicatedMergeTree::getClearBlocksInPartitionOps(
Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num)
{
getClearBlocksInPartitionOpsImpl(ops, zookeeper, partition_id, min_block_num, max_block_num, "blocks");
getClearBlocksInPartitionOpsImpl(ops, zookeeper, partition_id, min_block_num, max_block_num, "async_blocks");
}
void StorageReplicatedMergeTree::getClearBlocksInPartitionOpsImpl(
Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num, const String & blocks_dir_name)
{
Strings blocks;
if (Coordination::Error::ZOK != zookeeper.tryGetChildren(fs::path(zookeeper_path) / "blocks", blocks))
throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (Coordination::Error::ZOK != zookeeper.tryGetChildren(fs::path(zookeeper_path) / blocks_dir_name, blocks))
throw Exception(zookeeper_path + "/" + blocks_dir_name + "blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
String partition_prefix = partition_id + "_";
Strings paths_to_get;
for (const String & block_id : blocks)
if (startsWith(block_id, partition_prefix))
paths_to_get.push_back(fs::path(zookeeper_path) / "blocks" / block_id);
paths_to_get.push_back(fs::path(zookeeper_path) / blocks_dir_name / block_id);
auto results = zookeeper.tryGet(paths_to_get);
@ -7144,11 +7162,21 @@ void StorageReplicatedMergeTree::getCommitPartOps(
Coordination::Requests & ops,
const DataPartPtr & part,
const String & block_id_path) const
{
if (block_id_path.empty())
return getCommitPartOps(ops, part, std::vector<String>());
else
return getCommitPartOps(ops, part, std::vector<String>({block_id_path}));
}
void StorageReplicatedMergeTree::getCommitPartOps(
Coordination::Requests & ops,
const DataPartPtr & part,
const std::vector<String> & block_id_paths) const
{
const String & part_name = part->name;
const auto storage_settings_ptr = getSettings();
if (!block_id_path.empty())
for (const String & block_id_path : block_id_paths)
{
/// Make final duplicate check and commit block_id
ops.emplace_back(
@ -8724,6 +8752,18 @@ void StorageReplicatedMergeTree::attachRestoredParts(MutableDataPartsVector && p
sink->writeExistingPart(part);
}
template std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::allocateBlockNumber<String>(
const String & partition_id,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
const String & zookeeper_block_id_path,
const String & zookeeper_path_prefix) const;
template std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::allocateBlockNumber<std::vector<String>>(
const String & partition_id,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
const std::vector<String> & zookeeper_block_id_path,
const String & zookeeper_path_prefix) const;
#if 0
PartsTemporaryRename renamed_parts(*this, "detached/");
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);

View File

@ -356,7 +356,8 @@ private:
/// Delete old parts from disk and from ZooKeeper.
void clearOldPartsAndRemoveFromZK();
friend class ReplicatedMergeTreeSink;
template<bool async_insert>
friend class ReplicatedMergeTreeSinkImpl;
friend class ReplicatedMergeTreePartCheckThread;
friend class ReplicatedMergeTreeCleanupThread;
friend class ReplicatedMergeTreeAlterThread;
@ -554,6 +555,8 @@ private:
void getCommitPartOps(Coordination::Requests & ops, const DataPartPtr & part, const String & block_id_path = "") const;
void getCommitPartOps(Coordination::Requests & ops, const DataPartPtr & part, const std::vector<String> & block_id_paths) const;
/// Adds actions to `ops` that remove a part from ZooKeeper.
/// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes).
void removePartFromZooKeeper(const String & part_name, Coordination::Requests & ops, bool has_children);
@ -730,10 +733,12 @@ private:
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
const String & partition_id, const zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_block_id_path = "", const String & zookeeper_path_prefix = "") const;
template<typename T>
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
const String & partition_id,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
const String & zookeeper_block_id_path = "",
const T & zookeeper_block_id_path,
const String & zookeeper_path_prefix = "") const;
/** Wait until all replicas, including this, execute the specified action from the log.
@ -778,6 +783,8 @@ private:
void clearLockedBlockNumbersInPartition(zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
void getClearBlocksInPartitionOps(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
void getClearBlocksInPartitionOpsImpl(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num, const String & blocks_dir_name);
/// Remove block IDs from `blocks/` in ZooKeeper for the given partition ID in the given block number range.
void clearBlocksInPartition(
zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);

View File

@ -7,6 +7,8 @@
#include <Storages/VirtualColumnUtils.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/queryToString.h>
#include <Processors/ISource.h>
#include <QueryPipeline/Pipe.h>
@ -24,6 +26,7 @@ StorageSystemDataSkippingIndices::StorageSystemDataSkippingIndices(const Storage
{ "table", std::make_shared<DataTypeString>() },
{ "name", std::make_shared<DataTypeString>() },
{ "type", std::make_shared<DataTypeString>() },
{ "type_full", std::make_shared<DataTypeString>() },
{ "expr", std::make_shared<DataTypeString>() },
{ "granularity", std::make_shared<DataTypeUInt64>() },
{ "data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
@ -121,6 +124,14 @@ protected:
// 'type' column
if (column_mask[src_index++])
res_columns[res_index++]->insert(index.type);
// 'type_full' column
if (column_mask[src_index++])
{
if (auto * expression = index.definition_ast->as<ASTIndexDeclaration>(); expression && expression->type)
res_columns[res_index++]->insert(queryToString(*expression->type));
else
res_columns[res_index++]->insertDefault();
}
// 'expr' column
if (column_mask[src_index++])
{

View File

@ -65,7 +65,7 @@ private:
};
using DiskImplementations = testing::Types<DB::DiskMemory, DB::DiskLocal>;
using DiskImplementations = testing::Types<DB::DiskLocal>;
TYPED_TEST_SUITE(StorageLogTest, DiskImplementations);
// Returns data written to table in Values format.

View File

@ -388,7 +388,8 @@ class Release:
body_file = get_abs_path(".github/PULL_REQUEST_TEMPLATE.md")
self.run(
f"gh pr create --repo {self.repo} --title 'Update version after "
f"release' --head {helper_branch} --body-file '{body_file}'"
f"release' --head {helper_branch} --body-file '{body_file}' "
"--label 'do not test' --assignee @me"
)
# Here the testing part is done
yield

View File

@ -1,9 +0,0 @@
<clickhouse>
<storage_configuration>
<disks>
<disk_memory>
<type>memory</type>
</disk_memory>
</disks>
</storage_configuration>
</clickhouse>

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -15,9 +15,6 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</disk_s3>
<disk_memory>
<type>memory</type>
</disk_memory>
<disk_encrypted>
<type>encrypted</type>
<disk>disk_s3</disk>

View File

@ -7,9 +7,6 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</disk_s3>
<disk_memory>
<type>memory</type>
</disk_memory>
<disk_hdfs>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/</endpoint>

View File

@ -5,7 +5,6 @@ from helpers.test_tools import TSV
disk_types = {
"default": "local",
"disk_s3": "s3",
"disk_memory": "memory",
"disk_hdfs": "hdfs",
"disk_encrypted": "s3",
}

View File

@ -10,6 +10,12 @@
<type>local</type>
<path>/var/lib/clickhouse/path2/</path>
</test2>
<test3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</test3>
</disks>
<policies>
<test1>
@ -26,6 +32,13 @@
</main>
</volumes>
</test2>
<test3>
<volumes>
<main>
<disk>test3</disk>
</main>
</volumes>
</test3>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -10,8 +10,7 @@ def started_cluster():
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"disks_app_test",
main_configs=["config.xml"],
"disks_app_test", main_configs=["config.xml"], with_minio=True
)
cluster.start()
@ -33,6 +32,18 @@ def init_data(source):
source.query("INSERT INTO test_table(*) VALUES ('test1', 2)")
def init_data_s3(source):
source.query("DROP TABLE IF EXISTS test_table_s3")
source.query(
"CREATE TABLE test_table_s3(word String, value UInt64) "
"ENGINE=MergeTree() "
"ORDER BY word SETTINGS storage_policy = 'test3'"
)
source.query("INSERT INTO test_table_s3(*) VALUES ('test1', 2)")
def test_disks_app_func_ld(started_cluster):
source = cluster.instances["disks_app_test"]
@ -302,3 +313,32 @@ def test_disks_app_func_read_write(started_cluster):
files = out.split("\n")
assert files[0] == "tester"
def test_remote_disk_list(started_cluster):
source = cluster.instances["disks_app_test"]
init_data_s3(source)
out = source.exec_in_container(
["/usr/bin/clickhouse", "disks", "--save-logs", "--disk", "test3", "list", "."]
)
files = out.split("\n")
assert files[0] == "store"
out = source.exec_in_container(
[
"/usr/bin/clickhouse",
"disks",
"--save-logs",
"--disk",
"test3",
"list",
".",
"--recursive",
]
)
assert ".:\nstore\n" in out
assert "\n./store:\n" in out

View File

@ -7,9 +7,6 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</disk_s3>
<disk_memory>
<type>memory</type>
</disk_memory>
<disk_local>
<type>local</type>
<path>/disk/</path>

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,49 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=True)
node2 = cluster.add_instance("node2", with_zookeeper=True, stay_alive=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
for ix, node in enumerate([node1, node2]):
node.query_with_retry(
"""CREATE TABLE fetch_fallback (k int, v int, z String)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/t0', '{}')
ORDER BY tuple()""".format(
ix
)
)
yield cluster
finally:
cluster.shutdown()
def test_mutation_fetch_fallback(start_cluster):
node1.query("INSERT INTO fetch_fallback(k, v) VALUES (1, 3), (2, 7), (3, 4)")
node2.stop_clickhouse()
# Run a mutation using non-deterministic `hostName` function to produce
# different results on replicas and exercise the code responsible for
# discarding local mutation results and fetching "byte-identical" parts
# instead from the replica which first committed the mutation.
node1.query(
"ALTER TABLE fetch_fallback UPDATE z = hostName() WHERE 1 = 1",
settings={"mutations_sync": 1, "allow_nondeterministic_mutations": 1},
)
node2.start_clickhouse()
node1.query("SYSTEM SYNC REPLICA fetch_fallback", timeout=10)
node2.query("SYSTEM SYNC REPLICA fetch_fallback", timeout=10)
assert node2.contains_in_log(
"We will download merged part from replica to force byte-identical result."
)

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

View File

@ -1 +0,0 @@
#!/usr/bin/env python3

Some files were not shown because too many files have changed in this diff Show More