mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 17:50:47 +00:00
Merge remote-tracking branch 'upstream/master' into fix4
This commit is contained in:
commit
c878af8740
1
.gitignore
vendored
1
.gitignore
vendored
@ -35,6 +35,7 @@ cmake_install.cmake
|
||||
CTestTestfile.cmake
|
||||
*.a
|
||||
*.o
|
||||
cmake-build-*
|
||||
|
||||
# Python cache
|
||||
*.pyc
|
||||
|
@ -230,6 +230,7 @@ include (cmake/find_readline_edit.cmake)
|
||||
include (cmake/find_zookeeper.cmake)
|
||||
include (cmake/find_re2.cmake)
|
||||
include (cmake/find_rdkafka.cmake)
|
||||
include (cmake/find_capnp.cmake)
|
||||
|
||||
include (cmake/find_contrib_lib.cmake)
|
||||
find_contrib_lib(cityhash)
|
||||
|
22
cmake/find_capnp.cmake
Normal file
22
cmake/find_capnp.cmake
Normal file
@ -0,0 +1,22 @@
|
||||
option (ENABLE_CAPNP "Enable Cap'n Proto" ON)
|
||||
|
||||
if (ENABLE_CAPNP)
|
||||
set (CAPNP_PATHS "/usr/local/opt/capnp/lib")
|
||||
set (CAPNP_INCLUDE_PATHS "/usr/local/opt/capnp/include")
|
||||
find_library (CAPNP capnp PATHS ${CAPNP_PATHS})
|
||||
find_library (CAPNPC capnpc PATHS ${CAPNP_PATHS})
|
||||
find_library (KJ kj PATHS ${CAPNP_PATHS})
|
||||
set (CAPNP_LIBS ${CAPNP} ${CAPNPC} ${KJ})
|
||||
|
||||
find_path (CAPNP_INCLUDE_DIR NAMES capnp/schema-parser.h PATHS ${CAPNP_INCLUDE_PATHS})
|
||||
if (CAPNP_INCLUDE_DIR AND CAPNP_LIBS)
|
||||
include_directories (${CAPNP_INCLUDE_DIR})
|
||||
set(USE_CAPNP 1)
|
||||
endif ()
|
||||
endif ()
|
||||
|
||||
if (USE_CAPNP)
|
||||
message (STATUS "Using capnp=${USE_CAPNP}: ${CAPNP_INCLUDE_DIR} : ${CAPNP_LIBS}")
|
||||
else ()
|
||||
message (STATUS "Build without capnp (support for Cap'n Proto format will be disabled)")
|
||||
endif ()
|
2
contrib/poco
vendored
2
contrib/poco
vendored
@ -1 +1 @@
|
||||
Subproject commit ad1643c6698a8c890b68186d5c9d72e496c27af2
|
||||
Subproject commit 1366df1c7e068bb2efd846bc8dc8e286b090904e
|
@ -182,6 +182,10 @@ if (USE_ICU)
|
||||
target_link_libraries (dbms ${ICU_LIBS})
|
||||
endif ()
|
||||
|
||||
if (USE_CAPNP)
|
||||
target_link_libraries (dbms ${CAPNP_LIBS})
|
||||
endif ()
|
||||
|
||||
target_link_libraries (dbms
|
||||
${PLATFORM_LIBS}
|
||||
${CMAKE_DL_LIBS}
|
||||
|
@ -1,3 +1,5 @@
|
||||
#include <Common/Allocator.h>
|
||||
|
||||
#if !defined(__APPLE__) && !defined(__FreeBSD__)
|
||||
#include <malloc.h>
|
||||
#endif
|
||||
@ -7,10 +9,10 @@
|
||||
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/Allocator.h>
|
||||
|
||||
#include <Common/formatReadable.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
|
||||
/// Required for older Darwin builds, that lack definition of MAP_ANONYMOUS
|
||||
#ifndef MAP_ANONYMOUS
|
||||
#define MAP_ANONYMOUS MAP_ANON
|
||||
@ -54,11 +56,12 @@ void * Allocator<clear_memory_>::alloc(size_t size, size_t alignment)
|
||||
if (size >= MMAP_THRESHOLD)
|
||||
{
|
||||
if (alignment > MMAP_MIN_ALIGNMENT)
|
||||
throw DB::Exception("Too large alignment: more than page size.", DB::ErrorCodes::BAD_ARGUMENTS);
|
||||
throw DB::Exception("Too large alignment " + formatReadableSizeWithBinarySuffix(alignment) + ": more than page size when allocating "
|
||||
+ formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
buf = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
|
||||
if (MAP_FAILED == buf)
|
||||
DB::throwFromErrno("Allocator: Cannot mmap.", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
DB::throwFromErrno("Allocator: Cannot mmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
|
||||
/// No need for zero-fill, because mmap guarantees it.
|
||||
}
|
||||
@ -72,7 +75,7 @@ void * Allocator<clear_memory_>::alloc(size_t size, size_t alignment)
|
||||
buf = ::malloc(size);
|
||||
|
||||
if (nullptr == buf)
|
||||
DB::throwFromErrno("Allocator: Cannot malloc.", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
DB::throwFromErrno("Allocator: Cannot malloc " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -80,7 +83,7 @@ void * Allocator<clear_memory_>::alloc(size_t size, size_t alignment)
|
||||
int res = posix_memalign(&buf, alignment, size);
|
||||
|
||||
if (0 != res)
|
||||
DB::throwFromErrno("Cannot allocate memory (posix_memalign)", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
|
||||
DB::throwFromErrno("Cannot allocate memory (posix_memalign) " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
|
||||
|
||||
if (clear_memory)
|
||||
memset(buf, 0, size);
|
||||
@ -97,7 +100,7 @@ void Allocator<clear_memory_>::free(void * buf, size_t size)
|
||||
if (size >= MMAP_THRESHOLD)
|
||||
{
|
||||
if (0 != munmap(buf, size))
|
||||
DB::throwFromErrno("Allocator: Cannot munmap.", DB::ErrorCodes::CANNOT_MUNMAP);
|
||||
DB::throwFromErrno("Allocator: Cannot munmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_MUNMAP);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -119,7 +122,7 @@ void * Allocator<clear_memory_>::realloc(void * buf, size_t old_size, size_t new
|
||||
buf = ::realloc(buf, new_size);
|
||||
|
||||
if (nullptr == buf)
|
||||
DB::throwFromErrno("Allocator: Cannot realloc.", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
|
||||
if (clear_memory)
|
||||
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
|
||||
@ -130,7 +133,7 @@ void * Allocator<clear_memory_>::realloc(void * buf, size_t old_size, size_t new
|
||||
|
||||
buf = mremap(buf, old_size, new_size, MREMAP_MAYMOVE);
|
||||
if (MAP_FAILED == buf)
|
||||
DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + DB::toString(old_size) + " to " + DB::toString(new_size) + " bytes.", DB::ErrorCodes::CANNOT_MREMAP);
|
||||
DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP);
|
||||
|
||||
/// No need for zero-fill, because mmap guarantees it.
|
||||
}
|
||||
@ -144,7 +147,7 @@ void * Allocator<clear_memory_>::realloc(void * buf, size_t old_size, size_t new
|
||||
buf = ::realloc(buf, new_size);
|
||||
|
||||
if (nullptr == buf)
|
||||
DB::throwFromErrno("Allocator: Cannot realloc.", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
|
||||
if (clear_memory)
|
||||
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/formatReadable.h>
|
||||
|
||||
/// Required for older Darwin builds, that lack definition of MAP_ANONYMOUS
|
||||
#ifndef MAP_ANONYMOUS
|
||||
@ -172,13 +173,13 @@ private:
|
||||
{
|
||||
ptr = mmap(address_hint, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
|
||||
if (MAP_FAILED == ptr)
|
||||
DB::throwFromErrno("Allocator: Cannot mmap.", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
DB::throwFromErrno("Allocator: Cannot mmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
}
|
||||
|
||||
~Chunk()
|
||||
{
|
||||
if (ptr && 0 != munmap(ptr, size))
|
||||
DB::throwFromErrno("Allocator: Cannot munmap.", DB::ErrorCodes::CANNOT_MUNMAP);
|
||||
DB::throwFromErrno("Allocator: Cannot munmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_MUNMAP);
|
||||
}
|
||||
|
||||
Chunk(Chunk && other) : ptr(other.ptr), size(other.size)
|
||||
|
@ -20,7 +20,16 @@ namespace DB
|
||||
MemoryTracker::~MemoryTracker()
|
||||
{
|
||||
if (peak)
|
||||
logPeakMemoryUsage();
|
||||
{
|
||||
try
|
||||
{
|
||||
logPeakMemoryUsage();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// Exception in Logger, intentionally swallow.
|
||||
}
|
||||
}
|
||||
|
||||
/** This is needed for next memory tracker to be consistent with sum of all referring memory trackers.
|
||||
*
|
||||
|
@ -21,6 +21,27 @@ namespace ProfileEvents
|
||||
namespace zkutil
|
||||
{
|
||||
|
||||
|
||||
/// You should reinitialize ZooKeeper session in case of these errors
|
||||
inline bool isUnrecoverableErrorCode(int32_t zk_return_code)
|
||||
{
|
||||
return zk_return_code == ZINVALIDSTATE || zk_return_code == ZSESSIONEXPIRED || zk_return_code == ZSESSIONMOVED;
|
||||
}
|
||||
|
||||
/// Errors related with temporary network problems
|
||||
inline bool isTemporaryErrorCode(int32_t zk_return_code)
|
||||
{
|
||||
return zk_return_code == ZCONNECTIONLOSS || zk_return_code == ZOPERATIONTIMEOUT;
|
||||
}
|
||||
|
||||
/// Any error related with network or master election
|
||||
/// In case of these errors you should retry the query or reinitialize ZooKeeper session (see isUnrecoverable())
|
||||
inline bool isHardwareErrorCode(int32_t zk_return_code)
|
||||
{
|
||||
return isUnrecoverableErrorCode(zk_return_code) || isTemporaryErrorCode(zk_return_code);
|
||||
}
|
||||
|
||||
|
||||
class KeeperException : public DB::Exception
|
||||
{
|
||||
private:
|
||||
@ -29,35 +50,36 @@ private:
|
||||
: DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION), code(code) { incrementEventCounter(); }
|
||||
|
||||
public:
|
||||
KeeperException(const std::string & msg) : KeeperException(msg, ZOK, 0) {}
|
||||
explicit KeeperException(const std::string & msg) : KeeperException(msg, ZOK, 0) {}
|
||||
KeeperException(const std::string & msg, const int32_t code)
|
||||
: KeeperException(msg + " (" + zerror(code) + ")", code, 0) {}
|
||||
KeeperException(const int32_t code) : KeeperException(zerror(code), code, 0) {}
|
||||
explicit KeeperException(const int32_t code) : KeeperException(zerror(code), code, 0) {}
|
||||
KeeperException(const int32_t code, const std::string & path)
|
||||
: KeeperException(std::string{zerror(code)} + ", path: " + path, code, 0) {}
|
||||
|
||||
KeeperException(const KeeperException & exc) : DB::Exception(exc), code(exc.code) { incrementEventCounter(); }
|
||||
|
||||
const char * name() const throw() { return "zkutil::KeeperException"; }
|
||||
const char * className() const throw() { return "zkutil::KeeperException"; }
|
||||
KeeperException * clone() const { return new KeeperException(*this); }
|
||||
const char * name() const throw() override { return "zkutil::KeeperException"; }
|
||||
const char * className() const throw() override { return "zkutil::KeeperException"; }
|
||||
KeeperException * clone() const override { return new KeeperException(*this); }
|
||||
|
||||
/// при этих ошибках надо переинициализировать сессию с zookeeper
|
||||
/// You should reinitialize ZooKeeper session in case of these errors
|
||||
bool isUnrecoverable() const
|
||||
{
|
||||
return code == ZINVALIDSTATE || code == ZSESSIONEXPIRED || code == ZSESSIONMOVED;
|
||||
}
|
||||
|
||||
/// любая ошибка связанная с работой сети, перевыбором мастера
|
||||
/// при этих ошибках надо либо повторить запрос повторно, либо переинициализировать сессию (см. isUnrecoverable())
|
||||
bool isHardwareError() const
|
||||
{
|
||||
return isUnrecoverable() || code == ZCONNECTIONLOSS || code == ZOPERATIONTIMEOUT;
|
||||
return isUnrecoverableErrorCode(code);
|
||||
}
|
||||
|
||||
/// Errors related with temporary network problems
|
||||
bool isTemporaryError() const
|
||||
{
|
||||
return code == ZCONNECTIONLOSS || code == ZOPERATIONTIMEOUT;
|
||||
return isTemporaryErrorCode(code);
|
||||
}
|
||||
|
||||
/// Any error related with network or master election
|
||||
/// In case of these errors you should retry the query or reinitialize ZooKeeper session (see isUnrecoverable())
|
||||
bool isHardwareError() const
|
||||
{
|
||||
return isHardwareErrorCode(code);
|
||||
}
|
||||
|
||||
const int32_t code;
|
||||
|
@ -7,6 +7,7 @@
|
||||
#cmakedefine01 USE_RE2_ST
|
||||
#cmakedefine01 USE_VECTORCLASS
|
||||
#cmakedefine01 USE_RDKAFKA
|
||||
#cmakedefine01 USE_CAPNP
|
||||
#cmakedefine01 Poco_DataODBC_FOUND
|
||||
#cmakedefine01 Poco_MongoDB_FOUND
|
||||
#cmakedefine01 Poco_NetSSL_FOUND
|
||||
|
197
dbms/src/DataStreams/CapnProtoRowInputStream.cpp
Normal file
197
dbms/src/DataStreams/CapnProtoRowInputStream.cpp
Normal file
@ -0,0 +1,197 @@
|
||||
#if USE_CAPNP
|
||||
|
||||
#include <Core/Block.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <DataStreams/CapnProtoRowInputStream.h>
|
||||
|
||||
#include <capnp/serialize.h>
|
||||
#include <capnp/dynamic.h>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/range/join.hpp>
|
||||
#include <common/logger_useful.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
CapnProtoRowInputStream::NestedField split(const Block & sample, size_t i)
|
||||
{
|
||||
CapnProtoRowInputStream::NestedField field = {{}, i};
|
||||
|
||||
// Remove leading dot in field definition, e.g. ".msg" -> "msg"
|
||||
String name(sample.safeGetByPosition(i).name);
|
||||
if (name.size() > 0 && name[0] == '.')
|
||||
name.erase(0, 1);
|
||||
|
||||
boost::split(field.tokens, name, boost::is_any_of("."));
|
||||
return field;
|
||||
}
|
||||
|
||||
|
||||
Field convertNodeToField(capnp::DynamicValue::Reader value)
|
||||
{
|
||||
switch (value.getType()) {
|
||||
case capnp::DynamicValue::UNKNOWN:
|
||||
throw Exception("Unknown field type");
|
||||
case capnp::DynamicValue::VOID:
|
||||
return Field();
|
||||
case capnp::DynamicValue::BOOL:
|
||||
return UInt64(value.as<bool>() ? 1 : 0);
|
||||
case capnp::DynamicValue::INT:
|
||||
return Int64((value.as<int64_t>()));
|
||||
case capnp::DynamicValue::UINT:
|
||||
return UInt64(value.as<uint64_t>());
|
||||
case capnp::DynamicValue::FLOAT:
|
||||
return Float64(value.as<double>());
|
||||
case capnp::DynamicValue::TEXT:
|
||||
{
|
||||
auto arr = value.as<capnp::Text>();
|
||||
return String(arr.begin(), arr.size());
|
||||
}
|
||||
case capnp::DynamicValue::DATA:
|
||||
{
|
||||
auto arr = value.as<capnp::Data>().asChars();
|
||||
return String(arr.begin(), arr.size());
|
||||
}
|
||||
case capnp::DynamicValue::LIST:
|
||||
{
|
||||
auto listValue = value.as<capnp::DynamicList>();
|
||||
Array res(listValue.size());
|
||||
for (auto i : kj::indices(listValue))
|
||||
res[i] = convertNodeToField(listValue[i]);
|
||||
return res;
|
||||
}
|
||||
case capnp::DynamicValue::ENUM:
|
||||
return UInt64(value.as<capnp::DynamicEnum>().getRaw());
|
||||
case capnp::DynamicValue::STRUCT:
|
||||
throw Exception("STRUCT type not supported, read individual fields instead");
|
||||
case capnp::DynamicValue::CAPABILITY:
|
||||
throw Exception("CAPABILITY type not supported");
|
||||
case capnp::DynamicValue::ANY_POINTER:
|
||||
throw Exception("ANY_POINTER type not supported");
|
||||
}
|
||||
}
|
||||
|
||||
capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::string & field)
|
||||
{
|
||||
KJ_IF_MAYBE(child, node.findFieldByName(field))
|
||||
return *child;
|
||||
else
|
||||
throw Exception("Field " + field + " doesn't exist in schema.");
|
||||
}
|
||||
|
||||
void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader)
|
||||
{
|
||||
String last;
|
||||
size_t level = 0;
|
||||
capnp::StructSchema::Field parent;
|
||||
|
||||
for (const auto & field : sortedFields)
|
||||
{
|
||||
// Move to a different field in the same structure, keep parent
|
||||
if (level > 0 && field.tokens[level - 1] != last)
|
||||
{
|
||||
auto child = getFieldOrThrow(parent.getContainingStruct(), field.tokens[level - 1]);
|
||||
reader = child.getType().asStruct();
|
||||
actions.push_back({Action::POP});
|
||||
actions.push_back({Action::PUSH, child});
|
||||
}
|
||||
// Descend to a nested structure
|
||||
for (; level < field.tokens.size() - 1; ++level)
|
||||
{
|
||||
last = field.tokens[level];
|
||||
parent = getFieldOrThrow(reader, last);
|
||||
reader = parent.getType().asStruct();
|
||||
actions.push_back({Action::PUSH, parent});
|
||||
}
|
||||
// Read field from the structure
|
||||
actions.push_back({Action::READ, getFieldOrThrow(reader, field.tokens[level]), field.pos});
|
||||
}
|
||||
}
|
||||
|
||||
CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block & sample_, const String & schema_file, const String & root_object)
|
||||
: istr(istr_), sample(sample_), parser(std::make_shared<SchemaParser>())
|
||||
{
|
||||
// Parse the schema and fetch the root object
|
||||
auto schema = parser->impl.parseDiskFile(schema_file, schema_file, {});
|
||||
root = schema.getNested(root_object).asStruct();
|
||||
|
||||
/**
|
||||
* The schema typically consists of fields in various nested structures.
|
||||
* Here we gather the list of fields and sort them in a way so that fields in the same structur are adjacent,
|
||||
* and the nesting level doesn't decrease to make traversal easier.
|
||||
*/
|
||||
NestedFieldList list;
|
||||
size_t columns = sample.columns();
|
||||
for (size_t i = 0; i < columns; ++i)
|
||||
list.push_back(split(sample, i));
|
||||
|
||||
// Reorder list to make sure we don't have to backtrack
|
||||
std::sort(list.begin(), list.end(), [](const NestedField & a, const NestedField & b)
|
||||
{
|
||||
if (a.tokens.size() == b.tokens.size())
|
||||
return a.tokens < b.tokens;
|
||||
return a.tokens.size() < b.tokens.size();
|
||||
});
|
||||
|
||||
createActions(list, root);
|
||||
}
|
||||
|
||||
|
||||
bool CapnProtoRowInputStream::read(Block & block)
|
||||
{
|
||||
if (istr.eof())
|
||||
return false;
|
||||
|
||||
// Read from underlying buffer directly
|
||||
auto buf = istr.buffer();
|
||||
auto base = reinterpret_cast<const capnp::word *>(istr.position());
|
||||
|
||||
// Check if there's enough bytes in the buffer to read the full message
|
||||
kj::Array<capnp::word> heap_array;
|
||||
auto array = kj::arrayPtr(base, buf.size() - istr.offset());
|
||||
auto expected_words = capnp::expectedSizeInWordsFromPrefix(array);
|
||||
if (expected_words * sizeof(capnp::word) > array.size())
|
||||
{
|
||||
// We'll need to reassemble the message in a contiguous buffer
|
||||
heap_array = kj::heapArray<capnp::word>(expected_words);
|
||||
istr.readStrict(heap_array.asChars().begin(), heap_array.asChars().size());
|
||||
array = heap_array.asPtr();
|
||||
}
|
||||
|
||||
capnp::FlatArrayMessageReader msg(array);
|
||||
std::vector<capnp::DynamicStruct::Reader> stack;
|
||||
stack.push_back(msg.getRoot<capnp::DynamicStruct>(root));
|
||||
|
||||
for (auto action : actions)
|
||||
{
|
||||
switch (action.type) {
|
||||
case Action::READ: {
|
||||
auto & col = block.getByPosition(action.column);
|
||||
Field value = convertNodeToField(stack.back().get(action.field));
|
||||
col.column->insert(value);
|
||||
break;
|
||||
}
|
||||
case Action::POP:
|
||||
stack.pop_back();
|
||||
break;
|
||||
case Action::PUSH:
|
||||
stack.push_back(stack.back().get(action.field).as<capnp::DynamicStruct>());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Advance buffer position if used directly
|
||||
if (heap_array.size() == 0)
|
||||
{
|
||||
auto parsed = (msg.getEnd() - base) * sizeof(capnp::word);
|
||||
istr.position() += parsed;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
68
dbms/src/DataStreams/CapnProtoRowInputStream.h
Normal file
68
dbms/src/DataStreams/CapnProtoRowInputStream.h
Normal file
@ -0,0 +1,68 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Block.h>
|
||||
#include <DataStreams/IRowInputStream.h>
|
||||
|
||||
#include <capnp/schema-parser.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ReadBuffer;
|
||||
|
||||
/** A stream for reading messages in Cap'n Proto format in given schema.
|
||||
* Like Protocol Buffers and Thrift (but unlike JSON or MessagePack),
|
||||
* Cap'n Proto messages are strongly-typed and not self-describing.
|
||||
* The schema in this case cannot be compiled in, so it uses a runtime schema parser.
|
||||
* See https://capnproto.org/cxx.html
|
||||
*/
|
||||
class CapnProtoRowInputStream : public IRowInputStream
|
||||
{
|
||||
public:
|
||||
struct NestedField
|
||||
{
|
||||
std::vector<std::string> tokens;
|
||||
size_t pos;
|
||||
};
|
||||
using NestedFieldList = std::vector<NestedField>;
|
||||
|
||||
/** schema_file - location of the capnproto schema, e.g. "schema.canpn"
|
||||
* root_object - name to the root object, e.g. "Message"
|
||||
*/
|
||||
CapnProtoRowInputStream(ReadBuffer & istr_, const Block & sample_, const String & schema_file, const String & root_object);
|
||||
|
||||
bool read(Block & block) override;
|
||||
|
||||
private:
|
||||
// Build a traversal plan from a sorted list of fields
|
||||
void createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader);
|
||||
|
||||
/* Action for state machine for traversing nested structures. */
|
||||
struct Action
|
||||
{
|
||||
enum Type { POP, PUSH, READ };
|
||||
Type type;
|
||||
capnp::StructSchema::Field field;
|
||||
size_t column;
|
||||
};
|
||||
|
||||
// Wrapper for classes that could throw in destructor
|
||||
// https://github.com/capnproto/capnproto/issues/553
|
||||
template <typename T>
|
||||
struct DestructorCatcher
|
||||
{
|
||||
T impl;
|
||||
template <typename ... Arg>
|
||||
DestructorCatcher(Arg && ... args) : impl(kj::fwd<Arg>(args)...) {}
|
||||
~DestructorCatcher() noexcept try { } catch (...) { }
|
||||
};
|
||||
using SchemaParser = DestructorCatcher<capnp::SchemaParser>;
|
||||
|
||||
ReadBuffer & istr;
|
||||
const Block sample;
|
||||
std::shared_ptr<SchemaParser> parser;
|
||||
capnp::StructSchema root;
|
||||
std::vector<Action> actions;
|
||||
};
|
||||
|
||||
}
|
@ -1,3 +1,4 @@
|
||||
#include <Common/config.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <DataStreams/NativeBlockInputStream.h>
|
||||
#include <DataStreams/NativeBlockOutputStream.h>
|
||||
@ -30,6 +31,11 @@
|
||||
#include <DataStreams/FormatFactory.h>
|
||||
#include <DataStreams/SquashingBlockOutputStream.h>
|
||||
#include <DataTypes/FormatSettingsJSON.h>
|
||||
#if USE_CAPNP
|
||||
#include <DataStreams/CapnProtoRowInputStream.h>
|
||||
#endif
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -92,6 +98,18 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
|
||||
{
|
||||
return wrap_row_stream(std::make_shared<JSONEachRowRowInputStream>(buf, sample, settings.input_format_skip_unknown_fields));
|
||||
}
|
||||
#if USE_CAPNP
|
||||
else if (name == "CapnProto")
|
||||
{
|
||||
std::vector<String> tokens;
|
||||
auto schema_and_root = settings.format_schema.toString();
|
||||
boost::split(tokens, schema_and_root, boost::is_any_of(":"));
|
||||
if (tokens.size() != 2)
|
||||
throw Exception("Format CapnProto requires 'format_schema' setting to have schema_file:root_object format, e.g. 'schema.capnp:Message'");
|
||||
|
||||
return wrap_row_stream(std::make_shared<CapnProtoRowInputStream>(buf, sample, tokens[0], tokens[1]));
|
||||
}
|
||||
#endif
|
||||
else if (name == "TabSeparatedRaw"
|
||||
|| name == "TSVRaw"
|
||||
|| name == "BlockTabSeparated"
|
||||
|
69
dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp
Normal file
69
dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp
Normal file
@ -0,0 +1,69 @@
|
||||
#include "PushingToViewsBlockOutputStream.h"
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(String database, String table, const Context & context_,
|
||||
const ASTPtr & query_ptr_, bool no_destination)
|
||||
: context(context_), query_ptr(query_ptr_)
|
||||
{
|
||||
storage = context.getTable(database, table);
|
||||
|
||||
/** TODO This is a very important line. At any insertion into the table one of streams should own lock.
|
||||
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
|
||||
* but it's clear that here is not the best place for this functionality.
|
||||
*/
|
||||
addTableLock(storage->lockStructure(true, __PRETTY_FUNCTION__));
|
||||
|
||||
Dependencies dependencies = context.getDependencies(database, table);
|
||||
|
||||
/// We need special context for materialized views insertions
|
||||
if (!dependencies.empty())
|
||||
{
|
||||
views_context = std::make_unique<Context>(context);
|
||||
// Do not deduplicate insertions into MV if the main insertion is Ok
|
||||
views_context->getSettingsRef().insert_deduplicate = false;
|
||||
}
|
||||
|
||||
for (const auto & database_table : dependencies)
|
||||
{
|
||||
auto dependent_table = context.getTable(database_table.first, database_table.second);
|
||||
auto & materialized_view = dynamic_cast<const StorageMaterializedView &>(*dependent_table);
|
||||
|
||||
auto query = materialized_view.getInnerQuery();
|
||||
auto next = std::make_shared<PushingToViewsBlockOutputStream>(database_table.first, database_table.second, *views_context, ASTPtr());
|
||||
|
||||
views.emplace_back(std::move(query), std::move(next));
|
||||
}
|
||||
|
||||
/* Do not push to destination table if the flag is set */
|
||||
if (!no_destination)
|
||||
{
|
||||
output = storage->write(query_ptr, context.getSettingsRef());
|
||||
replicated_output = dynamic_cast<ReplicatedMergeTreeBlockOutputStream *>(output.get());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void PushingToViewsBlockOutputStream::write(const Block & block)
|
||||
{
|
||||
if (output)
|
||||
output->write(block);
|
||||
|
||||
/// Don't process materialized views if this block is duplicate
|
||||
if (replicated_output && replicated_output->lastBlockIsDuplicate())
|
||||
return;
|
||||
|
||||
/// Insert data into materialized views only after successful insert into main table
|
||||
for (auto & view : views)
|
||||
{
|
||||
BlockInputStreamPtr from = std::make_shared<OneBlockInputStream>(block);
|
||||
InterpreterSelectQuery select(view.first, *views_context, QueryProcessingStage::Complete, 0, from);
|
||||
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
|
||||
copyData(*data, *view.second);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -11,47 +11,17 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ReplicatedMergeTreeBlockOutputStream;
|
||||
|
||||
|
||||
/** Writes data to the specified table and to all dependent materialized views.
|
||||
*/
|
||||
class PushingToViewsBlockOutputStream : public IBlockOutputStream
|
||||
{
|
||||
public:
|
||||
PushingToViewsBlockOutputStream(String database, String table, const Context & context_, const ASTPtr & query_ptr_, bool no_destination = false)
|
||||
: context(context_), query_ptr(query_ptr_)
|
||||
{
|
||||
storage = context.getTable(database, table);
|
||||
PushingToViewsBlockOutputStream(String database, String table, const Context & context_, const ASTPtr & query_ptr_, bool no_destination = false);
|
||||
|
||||
/** TODO This is a very important line. At any insertion into the table one of streams should own lock.
|
||||
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
|
||||
* but it's clear that here is not the best place for this functionality.
|
||||
*/
|
||||
addTableLock(storage->lockStructure(true, __PRETTY_FUNCTION__));
|
||||
|
||||
Dependencies dependencies = context.getDependencies(database, table);
|
||||
for (const auto & database_table : dependencies)
|
||||
views.emplace_back(
|
||||
dynamic_cast<const StorageMaterializedView &>(*context.getTable(database_table.first, database_table.second)).getInnerQuery(),
|
||||
std::make_shared<PushingToViewsBlockOutputStream>(database_table.first, database_table.second, context, ASTPtr()));
|
||||
|
||||
/* Do not push to destination table if the flag is set */
|
||||
if (!no_destination)
|
||||
output = storage->write(query_ptr, context.getSettingsRef());
|
||||
}
|
||||
|
||||
void write(const Block & block) override
|
||||
{
|
||||
for (auto & view : views)
|
||||
{
|
||||
BlockInputStreamPtr from = std::make_shared<OneBlockInputStream>(block);
|
||||
InterpreterSelectQuery select(view.first, context, QueryProcessingStage::Complete, 0, from);
|
||||
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
|
||||
copyData(*data, *view.second);
|
||||
}
|
||||
|
||||
if (output)
|
||||
output->write(block);
|
||||
}
|
||||
void write(const Block & block) override;
|
||||
|
||||
void flush() override
|
||||
{
|
||||
@ -72,11 +42,16 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
StoragePtr storage;
|
||||
BlockOutputStreamPtr output;
|
||||
ReplicatedMergeTreeBlockOutputStream * replicated_output = nullptr;
|
||||
|
||||
const Context & context;
|
||||
ASTPtr query_ptr;
|
||||
|
||||
std::vector<std::pair<ASTPtr, BlockOutputStreamPtr>> views;
|
||||
std::unique_ptr<Context> views_context;
|
||||
};
|
||||
|
||||
|
||||
|
@ -100,7 +100,7 @@ Block createBlockWithNestedColumns(const Block & block, ColumnNumbers args, size
|
||||
if (col.type->isNullable())
|
||||
{
|
||||
bool is_const = col.column->isConst();
|
||||
auto const_col = static_cast<const ColumnConst *>(col.column.get());
|
||||
auto const_col = typeid_cast<const ColumnConst *>(col.column.get());
|
||||
|
||||
if (is_const && !const_col->getDataColumn().isNullable())
|
||||
throw Exception("Column at position " + toString(i + 1) + " with type " + col.type->getName() +
|
||||
|
@ -204,17 +204,12 @@ bool defaultImplementationForNulls(
|
||||
const ColumnWithTypeAndName & source_col = temporary_block.getByPosition(result);
|
||||
ColumnWithTypeAndName & dest_col = block.getByPosition(result);
|
||||
|
||||
if (source_col.column->isConst())
|
||||
dest_col.column = source_col.column;
|
||||
else
|
||||
{
|
||||
/// Initialize the result column.
|
||||
ColumnPtr null_map = std::make_shared<ColumnUInt8>(block.rows(), 0);
|
||||
dest_col.column = std::make_shared<ColumnNullable>(source_col.column, null_map);
|
||||
/// Initialize the result column.
|
||||
ColumnPtr null_map = std::make_shared<ColumnUInt8>(block.rows(), 0);
|
||||
dest_col.column = std::make_shared<ColumnNullable>(source_col.column, null_map);
|
||||
|
||||
/// Deduce the null map of the result from the null maps of the nullable columns.
|
||||
createNullMap(block, args, result);
|
||||
}
|
||||
/// Deduce the null map of the result from the null maps of the nullable columns.
|
||||
createNullMap(block, args, result);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -186,6 +186,9 @@ struct Settings
|
||||
/** The maximum number of concurrent requests per user. */ \
|
||||
M(SettingUInt64, max_concurrent_queries_for_user, 0) \
|
||||
\
|
||||
/** For INSERT queries in the replicated table, specifies that deduplication of insertings blocks should be preformed */ \
|
||||
M(SettingBool, insert_deduplicate, true) \
|
||||
\
|
||||
/** For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled. */ \
|
||||
M(SettingUInt64, insert_quorum, 0) \
|
||||
M(SettingMilliseconds, insert_quorum_timeout, 600000) \
|
||||
|
@ -157,10 +157,13 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
|
||||
|
||||
MergeTreeData::DataPartPtr Service::findPart(const String & name)
|
||||
{
|
||||
MergeTreeData::DataPartPtr part = data.getPartIfExists(name);
|
||||
/// It is important to include PreCommitted parts here
|
||||
/// Because part could be actually committed into ZooKeeper, but response from ZooKeeper to the server could be delayed
|
||||
auto part = data.getPartIfExists(name, {MergeTreeDataPart::State::PreCommitted, MergeTreeDataPart::State::Committed});
|
||||
if (part)
|
||||
return part;
|
||||
throw Exception("No part " + name + " in table");
|
||||
|
||||
throw Exception("No part " + name + " in table", ErrorCodes::NO_SUCH_DATA_PART);
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartPtr Service::findShardedPart(const String & name, size_t shard_no)
|
||||
|
@ -41,6 +41,8 @@
|
||||
|
||||
#include <Poco/DirectoryIterator.h>
|
||||
|
||||
#include <boost/range/adaptor/filtered.hpp>
|
||||
|
||||
#include <algorithm>
|
||||
#include <iomanip>
|
||||
#include <thread>
|
||||
@ -104,7 +106,6 @@ MergeTreeData::MergeTreeData(
|
||||
database_name(database_), table_name(table_),
|
||||
full_path(full_path_), columns(columns_),
|
||||
broken_part_callback(broken_part_callback_),
|
||||
parts_clean_callback(parts_clean_callback_ ? parts_clean_callback_ : [this](){ clearOldParts(); }),
|
||||
log_name(log_name_), log(&Logger::get(log_name + " (Data)"))
|
||||
{
|
||||
merging_params.check(*columns);
|
||||
@ -360,13 +361,13 @@ String MergeTreeData::MergingParams::getModeName() const
|
||||
{
|
||||
switch (mode)
|
||||
{
|
||||
case Ordinary: return "";
|
||||
case Collapsing: return "Collapsing";
|
||||
case Summing: return "Summing";
|
||||
case Aggregating: return "Aggregating";
|
||||
case Unsorted: return "Unsorted";
|
||||
case Ordinary: return "";
|
||||
case Collapsing: return "Collapsing";
|
||||
case Summing: return "Summing";
|
||||
case Aggregating: return "Aggregating";
|
||||
case Unsorted: return "Unsorted";
|
||||
case Replacing: return "Replacing";
|
||||
case Graphite: return "Graphite";
|
||||
case Graphite: return "Graphite";
|
||||
|
||||
default:
|
||||
throw Exception("Unknown mode of operation for MergeTreeData: " + toString<int>(mode), ErrorCodes::LOGICAL_ERROR);
|
||||
@ -376,10 +377,10 @@ String MergeTreeData::MergingParams::getModeName() const
|
||||
|
||||
Int64 MergeTreeData::getMaxDataPartIndex()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
|
||||
std::lock_guard<std::mutex> lock_all(data_parts_mutex);
|
||||
|
||||
Int64 max_block_id = 0;
|
||||
for (const auto & part : all_data_parts)
|
||||
for (const auto & part : data_parts)
|
||||
max_block_id = std::max(max_block_id, part->info.max_block);
|
||||
|
||||
return max_block_id;
|
||||
@ -391,10 +392,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
LOG_DEBUG(log, "Loading data parts");
|
||||
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
|
||||
|
||||
data_parts.clear();
|
||||
all_data_parts.clear();
|
||||
|
||||
Strings part_file_names;
|
||||
Poco::DirectoryIterator end;
|
||||
@ -494,6 +492,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
}
|
||||
|
||||
part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime();
|
||||
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
|
||||
part->state = DataPartState::Committed;
|
||||
|
||||
data_parts.insert(part);
|
||||
}
|
||||
@ -507,18 +507,17 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
for (auto & part : broken_parts_to_detach)
|
||||
part->renameAddPrefix(true, "");
|
||||
|
||||
all_data_parts = data_parts;
|
||||
|
||||
/// Delete from the set of current parts those parts that are covered by another part (those parts that
|
||||
/// were merged), but that for some reason are still not deleted from the filesystem.
|
||||
/// Deletion of files will be performed later in the clearOldParts() method.
|
||||
|
||||
if (data_parts.size() >= 2)
|
||||
{
|
||||
DataParts::iterator prev_jt = data_parts.begin();
|
||||
DataParts::iterator curr_jt = prev_jt;
|
||||
++curr_jt;
|
||||
while (curr_jt != data_parts.end())
|
||||
auto committed_parts = getDataPartsRange({DataPartState::Committed});
|
||||
auto prev_jt = committed_parts.begin();
|
||||
auto curr_jt = std::next(prev_jt);
|
||||
|
||||
while (curr_jt != committed_parts.end())
|
||||
{
|
||||
/// Don't consider data parts belonging to different partitions.
|
||||
if ((*curr_jt)->info.partition_id != (*prev_jt)->info.partition_id)
|
||||
@ -531,14 +530,15 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
if ((*curr_jt)->contains(**prev_jt))
|
||||
{
|
||||
(*prev_jt)->remove_time = (*prev_jt)->modification_time;
|
||||
data_parts.erase(prev_jt);
|
||||
(*prev_jt)->state = DataPartState::Outdated; /// prev_jt becomes invalid here
|
||||
prev_jt = curr_jt;
|
||||
++curr_jt;
|
||||
}
|
||||
else if ((*prev_jt)->contains(**curr_jt))
|
||||
{
|
||||
(*curr_jt)->remove_time = (*curr_jt)->modification_time;
|
||||
data_parts.erase(curr_jt++);
|
||||
(*curr_jt)->state = DataPartState::Outdated; /// curr_jt becomes invalid here
|
||||
++curr_jt;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -620,19 +620,18 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
|
||||
time_t now = time(nullptr);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock_all_parts(all_data_parts_mutex);
|
||||
std::lock_guard<std::mutex> lock_parts(data_parts_mutex);
|
||||
|
||||
for (auto it = all_data_parts.begin(); it != all_data_parts.end();)
|
||||
for (auto it = data_parts.begin(); it != data_parts.end(); ++it)
|
||||
{
|
||||
if (it->unique() && /// After this ref_count cannot increase.
|
||||
if ((*it)->state == DataPartState::Outdated &&
|
||||
it->unique() && /// Grab only parts that is not using by anyone (SELECTs for example)
|
||||
(*it)->remove_time < now &&
|
||||
now - (*it)->remove_time > settings.old_parts_lifetime.totalSeconds())
|
||||
{
|
||||
(*it)->state = DataPartState::Deleting;
|
||||
res.push_back(*it);
|
||||
all_data_parts.erase(it++);
|
||||
}
|
||||
else
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
@ -643,10 +642,33 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeData::addOldParts(const MergeTreeData::DataPartsVector & parts)
|
||||
void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector & parts)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(all_data_parts_mutex);
|
||||
all_data_parts.insert(parts.begin(), parts.end());
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
for (auto & part : parts)
|
||||
{
|
||||
/// We should modify it under data_parts_mutex
|
||||
part->assertState({DataPartState::Deleting});
|
||||
part->state = DataPartState::Outdated;
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & parts)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
|
||||
/// TODO: use data_parts iterators instead of pointers
|
||||
for (auto & part : parts)
|
||||
{
|
||||
if (part->state != DataPartState::Deleting)
|
||||
throw Exception("An attempt to delete part " + part->getNameWithState() + " with unexpected state", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
auto it = data_parts.find(part);
|
||||
if (it == data_parts.end())
|
||||
throw Exception("Deleting data part " + part->name + " is not exist", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
data_parts.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeData::clearOldParts()
|
||||
@ -684,12 +706,10 @@ void MergeTreeData::dropAllData()
|
||||
LOG_TRACE(log, "dropAllData: waiting for locks.");
|
||||
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
|
||||
|
||||
LOG_TRACE(log, "dropAllData: removing data from memory.");
|
||||
|
||||
data_parts.clear();
|
||||
all_data_parts.clear();
|
||||
column_sizes.clear();
|
||||
|
||||
context.dropCaches();
|
||||
@ -872,7 +892,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
|
||||
const IDataType * observed_type;
|
||||
if (is_nullable)
|
||||
{
|
||||
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(*column.type);
|
||||
auto & nullable_type = static_cast<const DataTypeNullable &>(*column.type);
|
||||
observed_type = nullable_type.getNestedType().get();
|
||||
}
|
||||
else
|
||||
@ -1288,12 +1308,16 @@ void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem
|
||||
+ " existing part(s) (including " + removed[0]->name + ")", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
|
||||
|
||||
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
|
||||
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
|
||||
{
|
||||
if (out_transaction && out_transaction->data)
|
||||
throw Exception("Using the same MergeTreeData::Transaction for overlapping transactions is invalid", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
part->assertState({DataPartState::Temporary});
|
||||
|
||||
DataPartsVector replaced;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
@ -1321,32 +1345,40 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
|
||||
|
||||
LOG_TRACE(log, "Renaming temporary part " << part->relative_path << " to " << new_name << ".");
|
||||
|
||||
if (data_parts.count(part))
|
||||
throw Exception("Part " + new_name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
|
||||
|
||||
bool in_all_data_parts;
|
||||
auto it_duplicate = data_parts.find(part);
|
||||
if (it_duplicate != data_parts.end())
|
||||
{
|
||||
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
|
||||
in_all_data_parts = all_data_parts.count(part) != 0;
|
||||
}
|
||||
/// New part can be removed from data_parts but not from filesystem and ZooKeeper
|
||||
if (in_all_data_parts)
|
||||
clearOldPartsAndRemoveFromZK();
|
||||
String message = "Part " + (*it_duplicate)->getNameWithState() + " already exists";
|
||||
if ((*it_duplicate)->checkState({DataPartState::Outdated, DataPartState::Deleting}))
|
||||
message += ", but it will be deleted soon";
|
||||
|
||||
/// Rename the part.
|
||||
part->renameTo(new_name);
|
||||
part->is_temp = false;
|
||||
throw Exception(message, ErrorCodes::DUPLICATE_DATA_PART);
|
||||
}
|
||||
|
||||
/// Rename the part only in memory. Will rename it on disk only if all check is passed.
|
||||
/// It allows us maintain invariant: if non-temporary parts in filesystem then they are in data_parts
|
||||
part->name = new_name;
|
||||
|
||||
bool obsolete = false; /// Is the part covered by some other part?
|
||||
/// Is the part covered by some other part?
|
||||
bool obsolete = false;
|
||||
|
||||
auto check_replacing_part_state = [&] (const DataPartPtr & cur_part)
|
||||
{
|
||||
cur_part->assertState({DataPartState::PreCommitted, DataPartState::Committed});
|
||||
if (cur_part->state == DataPartState::PreCommitted)
|
||||
throw Exception("Could not add part " + new_name + " while replacing part " + cur_part->name + " is in pre-committed state", ErrorCodes::LOGICAL_ERROR);
|
||||
};
|
||||
|
||||
/// Don't consider parts going to be deleted
|
||||
auto active_parts = getDataPartsRange({DataPartState::Committed, DataPartState::PreCommitted});
|
||||
/// Parts contained in the part are consecutive in data_parts, intersecting the insertion place for the part itself.
|
||||
auto it_middle = active_parts.convert(data_parts.lower_bound(part));
|
||||
|
||||
/// Parts contained in the part are consecutive in data_parts, intersecting the insertion place
|
||||
/// for the part itself.
|
||||
auto it = data_parts.lower_bound(part);
|
||||
/// Go to the left.
|
||||
while (it != data_parts.begin())
|
||||
for (auto it = it_middle; it != active_parts.begin();)
|
||||
{
|
||||
--it;
|
||||
|
||||
if (!part->contains(**it))
|
||||
{
|
||||
if ((*it)->contains(*part))
|
||||
@ -1354,41 +1386,79 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
|
||||
++it;
|
||||
break;
|
||||
}
|
||||
|
||||
check_replacing_part_state(*it);
|
||||
replaced.push_back(*it);
|
||||
(*it)->remove_time = time(nullptr);
|
||||
removePartContributionToColumnSizes(*it);
|
||||
data_parts.erase(it++); /// Yes, ++, not --.
|
||||
// replaced.push_back(*it);
|
||||
// (*it)->remove_time = time(nullptr);
|
||||
// (*it)->state = replaced_parts_state;
|
||||
// removePartContributionToColumnSizes(*it);
|
||||
// data_parts.erase(it++); /// Yes, ++, not --.
|
||||
}
|
||||
std::reverse(replaced.begin(), replaced.end()); /// Parts must be in ascending order.
|
||||
|
||||
/// Parts must be in ascending order.
|
||||
std::reverse(replaced.begin(), replaced.end());
|
||||
|
||||
/// Go to the right.
|
||||
while (it != data_parts.end())
|
||||
for (auto it = it_middle; it != active_parts.end();)
|
||||
{
|
||||
if ((*it)->name == part->name)
|
||||
throw Exception("Unexpected duplicate part " + part->getNameWithState() + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (!part->contains(**it))
|
||||
{
|
||||
if ((*it)->name == part->name || (*it)->contains(*part))
|
||||
if ((*it)->contains(*part))
|
||||
obsolete = true;
|
||||
break;
|
||||
}
|
||||
|
||||
check_replacing_part_state(*it);
|
||||
replaced.push_back(*it);
|
||||
(*it)->remove_time = time(nullptr);
|
||||
removePartContributionToColumnSizes(*it);
|
||||
data_parts.erase(it++);
|
||||
++it;
|
||||
// replaced.push_back(*it);
|
||||
// (*it)->remove_time = time(nullptr);
|
||||
// (*it)->state = replaced_parts_state;
|
||||
// removePartContributionToColumnSizes(*it);
|
||||
// data_parts.erase(it++);
|
||||
}
|
||||
|
||||
if (obsolete)
|
||||
{
|
||||
LOG_WARNING(log, "Obsolete part " << part->name << " added");
|
||||
part->remove_time = time(nullptr);
|
||||
/// I case of fail, we want to delete part from filesystem immediately (to avoid any conflicts)
|
||||
part->is_temp = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
data_parts.insert(part);
|
||||
addPartContributionToColumnSizes(part);
|
||||
}
|
||||
/// Now we can rename part on filesystem
|
||||
part->is_temp = false;
|
||||
part->renameTo(new_name);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
|
||||
all_data_parts.insert(part);
|
||||
if (!out_transaction)
|
||||
{
|
||||
/// Ordinary MergeTree engines (they don't use out_transaction) commit parts immediately
|
||||
part->state = DataPartState::Committed;
|
||||
addPartContributionToColumnSizes(part);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Whereas ReplicatedMergeTree uses intermediate PreCommitted state
|
||||
part->state = DataPartState::PreCommitted;
|
||||
}
|
||||
|
||||
data_parts.insert(part);
|
||||
|
||||
auto current_time = time(nullptr);
|
||||
for (auto & replacing_part : replaced)
|
||||
{
|
||||
if (!out_transaction)
|
||||
{
|
||||
replacing_part->remove_time = current_time;
|
||||
replacing_part->state = DataPartState::Outdated;
|
||||
removePartContributionToColumnSizes(replacing_part);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1396,63 +1466,79 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
|
||||
{
|
||||
out_transaction->data = this;
|
||||
out_transaction->parts_to_add_on_rollback = replaced;
|
||||
out_transaction->parts_to_remove_on_rollback = DataPartsVector(1, part);
|
||||
out_transaction->parts_to_remove_on_rollback = {part};
|
||||
}
|
||||
|
||||
return replaced;
|
||||
}
|
||||
|
||||
void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout)
|
||||
void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
|
||||
for (const DataPartPtr & part : remove)
|
||||
for (auto & part : remove)
|
||||
{
|
||||
part->remove_time = clear_without_timeout ? 0 : time(nullptr);
|
||||
if (!data_parts.count(part))
|
||||
throw Exception("Part " + part->getNameWithState() + " not found in data_parts", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (data_parts.erase(part))
|
||||
removePartContributionToColumnSizes(part);
|
||||
part->assertState({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated});
|
||||
}
|
||||
|
||||
for (const DataPartPtr & part : add)
|
||||
auto remove_time = clear_without_timeout ? 0 : time(nullptr);
|
||||
for (const DataPartPtr & part : remove)
|
||||
{
|
||||
if (data_parts.insert(part).second)
|
||||
addPartContributionToColumnSizes(part);
|
||||
if (part->state == DataPartState::Committed)
|
||||
removePartContributionToColumnSizes(part);
|
||||
part->state = DataPartState::Outdated;
|
||||
part->remove_time = remove_time;
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String & prefix, bool restore_covered, bool move_to_detached)
|
||||
|
||||
void MergeTreeData::renameAndDetachPart(const DataPartPtr & part_to_detach, const String & prefix, bool restore_covered,
|
||||
bool move_to_detached)
|
||||
{
|
||||
LOG_INFO(log, "Renaming " << part->relative_path << " to " << prefix << part->name << " and detaching it.");
|
||||
LOG_INFO(log, "Renaming " << part_to_detach->relative_path << " to " << prefix << part_to_detach->name << " and detaching it.");
|
||||
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
|
||||
//std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
|
||||
|
||||
if (!all_data_parts.erase(part))
|
||||
throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
|
||||
auto it_part = data_parts.find(part_to_detach);
|
||||
if (it_part == data_parts.end())
|
||||
throw Exception("No such data part " + part_to_detach->getNameWithState(), ErrorCodes::NO_SUCH_DATA_PART);
|
||||
|
||||
/// What if part_to_detach is reference to *it_part? Make a new owner just in case.
|
||||
auto part = *it_part;
|
||||
|
||||
removePartContributionToColumnSizes(part);
|
||||
data_parts.erase(part);
|
||||
part->state = DataPartState::Deleting;
|
||||
if (move_to_detached || !prefix.empty())
|
||||
part->renameAddPrefix(move_to_detached, prefix);
|
||||
|
||||
if (restore_covered)
|
||||
{
|
||||
auto it = all_data_parts.lower_bound(part);
|
||||
auto suitable_parts = getDataPartsRange({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated});
|
||||
auto it = suitable_parts.convert(data_parts.lower_bound(part));
|
||||
|
||||
Strings restored;
|
||||
bool error = false;
|
||||
|
||||
Int64 pos = part->info.min_block;
|
||||
|
||||
if (it != all_data_parts.begin())
|
||||
if (it != suitable_parts.begin())
|
||||
{
|
||||
--it;
|
||||
if (part->contains(**it))
|
||||
{
|
||||
if ((*it)->info.min_block != part->info.min_block)
|
||||
error = true;
|
||||
data_parts.insert(*it);
|
||||
addPartContributionToColumnSizes(*it);
|
||||
|
||||
if ((*it)->state != DataPartState::Committed)
|
||||
{
|
||||
addPartContributionToColumnSizes(*it);
|
||||
(*it)->state = DataPartState::Committed;
|
||||
}
|
||||
|
||||
pos = (*it)->info.max_block + 1;
|
||||
restored.push_back((*it)->name);
|
||||
}
|
||||
@ -1463,14 +1549,19 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String &
|
||||
else
|
||||
error = true;
|
||||
|
||||
for (; it != all_data_parts.end() && part->contains(**it); ++it)
|
||||
for (; it != suitable_parts.end() && part->contains(**it); ++it)
|
||||
{
|
||||
if ((*it)->info.min_block < pos)
|
||||
continue;
|
||||
if ((*it)->info.min_block > pos)
|
||||
error = true;
|
||||
data_parts.insert(*it);
|
||||
addPartContributionToColumnSizes(*it);
|
||||
|
||||
if ((*it)->state != DataPartState::Committed)
|
||||
{
|
||||
addPartContributionToColumnSizes(*it);
|
||||
(*it)->state = DataPartState::Committed;
|
||||
}
|
||||
|
||||
pos = (*it)->info.max_block + 1;
|
||||
restored.push_back((*it)->name);
|
||||
}
|
||||
@ -1488,39 +1579,18 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String &
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeData::detachPartInPlace(const DataPartPtr & part)
|
||||
{
|
||||
renameAndDetachPart(part, "", false, false);
|
||||
}
|
||||
|
||||
MergeTreeData::DataParts MergeTreeData::getDataParts() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
return data_parts;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
return DataPartsVector(std::begin(data_parts), std::end(data_parts));
|
||||
}
|
||||
|
||||
size_t MergeTreeData::getTotalActiveSizeInBytes() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
|
||||
size_t res = 0;
|
||||
for (auto & part : data_parts)
|
||||
for (auto & part : getDataPartsRange({DataPartState::Committed}))
|
||||
res += part->size_in_bytes;
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
MergeTreeData::DataParts MergeTreeData::getAllDataParts() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(all_data_parts_mutex);
|
||||
return all_data_parts;
|
||||
}
|
||||
|
||||
size_t MergeTreeData::getMaxPartsCountForPartition() const
|
||||
{
|
||||
@ -1530,7 +1600,7 @@ size_t MergeTreeData::getMaxPartsCountForPartition() const
|
||||
size_t cur_count = 0;
|
||||
const String * cur_partition_id = nullptr;
|
||||
|
||||
for (const auto & part : data_parts)
|
||||
for (const auto & part : getDataPartsRange({DataPartState::Committed}))
|
||||
{
|
||||
if (cur_partition_id && part->info.partition_id == *cur_partition_id)
|
||||
{
|
||||
@ -1586,9 +1656,10 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String &
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
|
||||
/// The part can be covered only by the previous or the next one in data_parts.
|
||||
auto it = data_parts.lower_bound(part_info);
|
||||
auto committed_parts = getDataPartsRange({DataPartState::Committed});
|
||||
auto it = committed_parts.convert(data_parts.lower_bound(part_info));
|
||||
|
||||
if (it != data_parts.end())
|
||||
if (it != committed_parts.end())
|
||||
{
|
||||
if ((*it)->name == part_name)
|
||||
return *it;
|
||||
@ -1596,7 +1667,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String &
|
||||
return *it;
|
||||
}
|
||||
|
||||
if (it != data_parts.begin())
|
||||
if (it != committed_parts.begin())
|
||||
{
|
||||
--it;
|
||||
if ((*it)->info.contains(part_info))
|
||||
@ -1606,13 +1677,16 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String &
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_name)
|
||||
|
||||
MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_name, const MergeTreeData::DataPartStates & valid_states)
|
||||
{
|
||||
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
|
||||
|
||||
std::lock_guard<std::mutex> lock(all_data_parts_mutex);
|
||||
auto it = all_data_parts.lower_bound(part_info);
|
||||
if (it != all_data_parts.end() && (*it)->name == part_name)
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
|
||||
auto filtered_parts = getDataPartsRange(valid_states);
|
||||
auto it = filtered_parts.convert(data_parts.find(part_info));
|
||||
if (it != filtered_parts.end() && (*it)->name == part_name)
|
||||
return *it;
|
||||
|
||||
return nullptr;
|
||||
@ -1666,7 +1740,8 @@ void MergeTreeData::calculateColumnSizesImpl()
|
||||
{
|
||||
column_sizes.clear();
|
||||
|
||||
for (const auto & part : data_parts)
|
||||
/// Take into account only committed parts
|
||||
for (const auto & part : getDataPartsRange({DataPartState::Committed}))
|
||||
addPartContributionToColumnSizes(part);
|
||||
}
|
||||
|
||||
@ -1893,6 +1968,70 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
|
||||
return partition_id;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const DataPartStates & affordable_states) const
|
||||
{
|
||||
DataPartsVector res;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
std::copy_if(data_parts.begin(), data_parts.end(), std::back_inserter(res), DataPart::getStatesFilter(affordable_states));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const MergeTreeData::DataPartStates & affordable_states,
|
||||
MergeTreeData::DataPartStateVector & out_states_snapshot) const
|
||||
{
|
||||
DataPartsVector res;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
std::copy_if(data_parts.begin(), data_parts.end(), std::back_inserter(res), DataPart::getStatesFilter(affordable_states));
|
||||
|
||||
out_states_snapshot.resize(res.size());
|
||||
for (size_t i = 0; i < res.size(); ++i)
|
||||
out_states_snapshot[i] = res[i]->state;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
|
||||
{
|
||||
DataParts res;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
std::copy_if(data_parts.begin(), data_parts.end(), std::inserter(res, res.end()), DataPart::getStatesFilter(affordable_states));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
MergeTreeData::DataParts MergeTreeData::getDataParts() const
|
||||
{
|
||||
return getDataParts({DataPartState::Committed});
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector() const
|
||||
{
|
||||
return getDataPartsVector({DataPartState::Committed});
|
||||
}
|
||||
|
||||
MergeTreeData::DataParts MergeTreeData::getAllDataParts() const
|
||||
{
|
||||
return getDataParts({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated});
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartPtr MergeTreeData::getAnyPartInPartition(
|
||||
const String & partition_id, std::lock_guard<std::mutex> & data_parts_lock)
|
||||
{
|
||||
auto min_block = std::numeric_limits<Int64>::min();
|
||||
MergeTreePartInfo dummy_part_info(partition_id, min_block, min_block, 0);
|
||||
|
||||
auto committed_parts = getDataPartsRange({DataPartState::Committed});
|
||||
auto it = committed_parts.convert(data_parts.lower_bound(dummy_part_info));
|
||||
|
||||
if (it != committed_parts.end() && (*it)->info.partition_id == partition_id)
|
||||
return *it;
|
||||
return {};
|
||||
}
|
||||
|
||||
void MergeTreeData::Transaction::rollback()
|
||||
{
|
||||
if (data && (!parts_to_remove_on_rollback.empty() || !parts_to_add_on_rollback.empty()))
|
||||
@ -1915,21 +2054,58 @@ void MergeTreeData::Transaction::rollback()
|
||||
|
||||
LOG_DEBUG(data->log, "Undoing transaction." << ss.str());
|
||||
|
||||
data->replaceParts(parts_to_remove_on_rollback, parts_to_add_on_rollback, true);
|
||||
|
||||
/// PreCommitted -> Outdated
|
||||
replaceParts(DataPartState::Outdated, DataPartState::Committed, true);
|
||||
clear();
|
||||
}
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartPtr MergeTreeData::getAnyPartInPartition(
|
||||
const String & partition_id, std::lock_guard<std::mutex> & data_parts_lock)
|
||||
void MergeTreeData::Transaction::commit()
|
||||
{
|
||||
auto min_block = std::numeric_limits<Int64>::min();
|
||||
MergeTreePartInfo dummy_part_info(partition_id, min_block, min_block, 0);
|
||||
auto it = data_parts.lower_bound(dummy_part_info);
|
||||
if (it != data_parts.end() && (*it)->info.partition_id == partition_id)
|
||||
return *it;
|
||||
return {};
|
||||
/// PreCommitted -> Committed, Committed -> Outdated
|
||||
replaceParts(DataPartState::Committed, DataPartState::Outdated, false);
|
||||
clear();
|
||||
}
|
||||
|
||||
void MergeTreeData::Transaction::replaceParts(MergeTreeData::DataPartState move_precommitted_to,
|
||||
MergeTreeData::DataPartState move_committed_to, bool remove_without_delay)
|
||||
{
|
||||
auto & committed_parts = parts_to_add_on_rollback;
|
||||
auto & precommitted_parts = parts_to_remove_on_rollback;
|
||||
|
||||
/// TODO: also make sense to activate CleanupThread's cv
|
||||
auto remove_time = (remove_without_delay) ? 0 : time(nullptr);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data->data_parts_mutex);
|
||||
|
||||
for (auto & part : committed_parts)
|
||||
part->assertState({DataPartState::Committed});
|
||||
for (auto & part : precommitted_parts)
|
||||
part->assertState({DataPartState::PreCommitted});
|
||||
|
||||
/// If it is rollback then do nothing, else make it Outdated and remove their size contribution
|
||||
if (move_committed_to != DataPartState::Committed)
|
||||
{
|
||||
for (auto & part : committed_parts)
|
||||
{
|
||||
part->state = move_committed_to;
|
||||
part->remove_time = remove_time;
|
||||
data->removePartContributionToColumnSizes(part);
|
||||
}
|
||||
}
|
||||
|
||||
/// If it is rollback just change state to Outdated, else change state to Committed and add their size contribution
|
||||
for (auto & part : precommitted_parts)
|
||||
{
|
||||
part->state = move_precommitted_to;
|
||||
if (move_precommitted_to == DataPartState::Committed)
|
||||
data->addPartContributionToColumnSizes(part);
|
||||
else
|
||||
part->remove_time = remove_time;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <DataStreams/GraphiteRollupSortedBlockInputStream.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPart.h>
|
||||
|
||||
#include <common/RangeFiltered.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -99,6 +100,10 @@ public:
|
||||
/// After the DataPart is added to the working set, it cannot be changed.
|
||||
using DataPartPtr = std::shared_ptr<const DataPart>;
|
||||
|
||||
using DataPartState = MergeTreeDataPart::State;
|
||||
using DataPartStates = std::initializer_list<DataPartState>;
|
||||
using DataPartStateVector = std::vector<DataPartState>;
|
||||
|
||||
struct DataPartPtrLess
|
||||
{
|
||||
using is_transparent = void;
|
||||
@ -122,10 +127,7 @@ public:
|
||||
public:
|
||||
Transaction() {}
|
||||
|
||||
void commit()
|
||||
{
|
||||
clear();
|
||||
}
|
||||
void commit();
|
||||
|
||||
void rollback();
|
||||
|
||||
@ -155,6 +157,8 @@ public:
|
||||
parts_to_remove_on_rollback.clear();
|
||||
parts_to_add_on_rollback.clear();
|
||||
}
|
||||
|
||||
void replaceParts(DataPartState move_precommitted_to, DataPartState move_committed_to, bool remove_without_delay);
|
||||
};
|
||||
|
||||
/// An object that stores the names of temporary files created in the part directory during ALTER of its
|
||||
@ -305,10 +309,29 @@ public:
|
||||
String getLogName() const { return log_name; }
|
||||
|
||||
/// Returns a copy of the list so that the caller shouldn't worry about locks.
|
||||
DataParts getDataParts(const DataPartStates & affordable_states) const;
|
||||
DataPartsVector getDataPartsVector(const DataPartStates & affordable_states) const;
|
||||
DataPartsVector getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector & out_states_snapshot) const;
|
||||
|
||||
/// Returns a virtual container iteration only through parts with specified states
|
||||
decltype(auto) getDataPartsRange(const DataPartStates & affordable_states) const
|
||||
{
|
||||
return createRangeFiltered(DataPart::getStatesFilter(affordable_states), data_parts);
|
||||
}
|
||||
|
||||
/// Returns Committed parts
|
||||
DataParts getDataParts() const;
|
||||
DataPartsVector getDataPartsVector() const;
|
||||
|
||||
/// Returns all parts except Temporary and Deleting ones
|
||||
DataParts getAllDataParts() const;
|
||||
|
||||
/// Returns an comitted part with the given name or a part containing it. If there is no such part, returns nullptr.
|
||||
DataPartPtr getActiveContainingPart(const String & part_name);
|
||||
|
||||
/// Returns the part with the given name (and state) or nullptr if no such part.
|
||||
DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states = {DataPartState::Committed});
|
||||
|
||||
/// Total size of active parts in bytes.
|
||||
size_t getTotalActiveSizeInBytes() const;
|
||||
|
||||
@ -318,12 +341,6 @@ public:
|
||||
/// If until is non-null, wake up from the sleep earlier if the event happened.
|
||||
void delayInsertIfNeeded(Poco::Event * until = nullptr);
|
||||
|
||||
/// Returns an active part with the given name or a part containing it. If there is no such part,
|
||||
/// returns nullptr.
|
||||
DataPartPtr getActiveContainingPart(const String & part_name);
|
||||
|
||||
/// Returns the part with the given name or nullptr if no such part.
|
||||
DataPartPtr getPartIfExists(const String & part_name);
|
||||
DataPartPtr getShardedPartIfExists(const String & part_name, size_t shard_no);
|
||||
|
||||
/// Renames temporary part to a permanent part and adds it to the working set.
|
||||
@ -337,26 +354,26 @@ public:
|
||||
DataPartsVector renameTempPartAndReplace(
|
||||
MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
|
||||
|
||||
/// Removes from the working set parts in remove and adds parts in add. Parts in add must already be in
|
||||
/// all_data_parts.
|
||||
/// Removes parts from the working set parts.
|
||||
/// Parts in add must already be in data_parts with PreCommitted, Committed, or Outdated states.
|
||||
/// If clear_without_timeout is true, the parts will be deleted at once, or during the next call to
|
||||
/// clearOldParts (ignoring old_parts_lifetime).
|
||||
void replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout);
|
||||
void removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout);
|
||||
|
||||
/// Renames the part to detached/<prefix>_<part> and forgets about it. The data won't be deleted in
|
||||
/// clearOldParts.
|
||||
/// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part.
|
||||
void renameAndDetachPart(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false, bool move_to_detached = true);
|
||||
|
||||
/// Removes the part from the list of parts (including all_data_parts), but doesn't move the directory.
|
||||
void detachPartInPlace(const DataPartPtr & part);
|
||||
|
||||
/// Returns old inactive parts that can be deleted. At the same time removes them from the list of parts
|
||||
/// but not from the disk.
|
||||
DataPartsVector grabOldParts();
|
||||
|
||||
/// Reverts the changes made by grabOldParts().
|
||||
void addOldParts(const DataPartsVector & parts);
|
||||
/// Reverts the changes made by grabOldParts(), parts should be in Deleting state.
|
||||
void rollbackDeletingParts(const DataPartsVector & parts);
|
||||
|
||||
/// Removes parts from data_parts, they should be in Deleting state
|
||||
void removePartsFinally(const DataPartsVector & parts);
|
||||
|
||||
/// Delete irrelevant parts.
|
||||
void clearOldParts();
|
||||
@ -400,12 +417,6 @@ public:
|
||||
broken_part_callback(name);
|
||||
}
|
||||
|
||||
/// Delete old parts from disk and ZooKeeper (in replicated case)
|
||||
void clearOldPartsAndRemoveFromZK()
|
||||
{
|
||||
parts_clean_callback();
|
||||
}
|
||||
|
||||
ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; }
|
||||
SortDescription getSortDescription() const { return sort_descr; }
|
||||
|
||||
@ -523,8 +534,6 @@ private:
|
||||
|
||||
/// Engine-specific methods
|
||||
BrokenPartCallback broken_part_callback;
|
||||
/// Use to delete outdated parts immediately from memory, disk and ZooKeeper
|
||||
PartsCleanCallback parts_clean_callback;
|
||||
|
||||
String log_name;
|
||||
Logger * log;
|
||||
@ -536,8 +545,8 @@ private:
|
||||
/// The set of all data parts including already merged but not yet deleted. Usually it is small (tens of elements).
|
||||
/// The part is referenced from here, from the list of current parts and from each thread reading from it.
|
||||
/// This means that if reference count is 1 - the part is not used right now and can be deleted.
|
||||
DataParts all_data_parts;
|
||||
mutable std::mutex all_data_parts_mutex;
|
||||
// DataParts all_data_parts;
|
||||
// mutable std::mutex all_data_parts_mutex;
|
||||
|
||||
/// Used to serialize calls to grabOldParts.
|
||||
std::mutex grab_old_parts_mutex;
|
||||
@ -572,7 +581,7 @@ private:
|
||||
void addPartContributionToColumnSizes(const DataPartPtr & part);
|
||||
void removePartContributionToColumnSizes(const DataPartPtr & part);
|
||||
|
||||
/// If there is no part in the partition with ID `partition_id`, returns empty ptr.
|
||||
/// If there is no part in the partition with ID `partition_id`, returns empty ptr. Should be called under the lock.
|
||||
DataPartPtr getAnyPartInPartition(const String & partition_id, std::lock_guard<std::mutex> & data_parts_lock);
|
||||
};
|
||||
|
||||
|
@ -500,8 +500,8 @@ size_t MergeTreeDataPart::calcTotalSize(const String & from)
|
||||
std::vector<std::string> files;
|
||||
cur.list(files);
|
||||
size_t res = 0;
|
||||
for (size_t i = 0; i < files.size(); ++i)
|
||||
res += calcTotalSize(from + files[i]);
|
||||
for (const auto & file : files)
|
||||
res += calcTotalSize(from + file);
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -541,6 +541,7 @@ void MergeTreeDataPart::remove() const
|
||||
LOG_WARNING(storage.log, "Directory " << from << " (part to remove) doesn't exist or one of nested files has gone."
|
||||
" Most likely this is due to manual removing. This should be discouraged. Ignoring.");
|
||||
|
||||
std::terminate();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -576,7 +577,7 @@ void MergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_n
|
||||
}
|
||||
}
|
||||
|
||||
from_file.setLastModified(Poco::Timestamp::fromEpochTime(time(0)));
|
||||
from_file.setLastModified(Poco::Timestamp::fromEpochTime(time(nullptr)));
|
||||
from_file.renameTo(to);
|
||||
relative_path = new_relative_path;
|
||||
}
|
||||
@ -910,4 +911,28 @@ size_t MergeTreeDataPart::getIndexSizeInAllocatedBytes() const
|
||||
return res;
|
||||
}
|
||||
|
||||
String MergeTreeDataPart::stateToString(MergeTreeDataPart::State state)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case State::Temporary:
|
||||
return "Temporary";
|
||||
case State::PreCommitted:
|
||||
return "PreCommitted";
|
||||
case State::Committed:
|
||||
return "Committed";
|
||||
case State::Outdated:
|
||||
return "Outdated";
|
||||
case State::Deleting:
|
||||
return "Deleting";
|
||||
default:
|
||||
throw Exception("Unknown part state " + std::to_string(static_cast<int>(state)), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
String MergeTreeDataPart::stateString() const
|
||||
{
|
||||
return stateToString(state);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -139,9 +139,87 @@ struct MergeTreeDataPart
|
||||
/// If true, the destructor will delete the directory with the part.
|
||||
bool is_temp = false;
|
||||
|
||||
/// If true it means that there are no ZooKeeper node for this part, so it should be deleted only from filesystem
|
||||
bool is_duplicate = false;
|
||||
|
||||
/// For resharding.
|
||||
size_t shard_no = 0;
|
||||
|
||||
/**
|
||||
* Part state is a stage of its lifetime. States are ordered and state of a part could be increased only.
|
||||
* Part state should be modified under data_parts mutex.
|
||||
*
|
||||
* Possible state transitions:
|
||||
* Temporary -> Precommitted: we are trying to commit a fetched, inserted or merged part to active set
|
||||
* Precommitted -> Outdated: we could not to add a part to active set and doing a rollback (for example it is duplicated part)
|
||||
* Precommitted -> Commited: we successfully committed a part to active dataset
|
||||
* Precommitted -> Outdated: a part was replaced by a covering part or DROP PARTITION
|
||||
* Outdated -> Deleting: a cleaner selected this part for deletion
|
||||
* Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion
|
||||
*/
|
||||
enum class State
|
||||
{
|
||||
Temporary, /// the part is generating now, it is not in data_parts list
|
||||
PreCommitted, /// the part is in data_parts, but not used for SELECTs
|
||||
Committed, /// active data part, used by current and upcoming SELECTs
|
||||
Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
|
||||
Deleting /// not active data part with identity refcounter, it is deleting right now by a cleaner
|
||||
};
|
||||
|
||||
/// Current state of the part. If the part is in working set already, it should be accessed via data_parts mutex
|
||||
mutable State state{State::Temporary};
|
||||
|
||||
/// Returns name of state
|
||||
static String stateToString(State state);
|
||||
String stateString() const;
|
||||
|
||||
String getNameWithState() const
|
||||
{
|
||||
return name + " (state " + stateString() + ")";
|
||||
}
|
||||
|
||||
/// Returns true if state of part is one of affordable_states
|
||||
bool checkState(const std::initializer_list<State> & affordable_states) const
|
||||
{
|
||||
for (auto affordable_state : affordable_states)
|
||||
{
|
||||
if (state == affordable_state)
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Throws an exception if state of the part is not in affordable_states
|
||||
void assertState(const std::initializer_list<State> & affordable_states) const
|
||||
{
|
||||
if (!checkState(affordable_states))
|
||||
{
|
||||
String states_str;
|
||||
for (auto state : affordable_states)
|
||||
states_str += stateToString(state) + " ";
|
||||
|
||||
throw Exception("Unexpected state of part " + getNameWithState() + ". Expected: " + states_str);
|
||||
}
|
||||
}
|
||||
|
||||
/// In comparison with lambdas, it is move assignable and could has several overloaded operator()
|
||||
struct StatesFilter
|
||||
{
|
||||
std::initializer_list<State> affordable_states;
|
||||
StatesFilter(const std::initializer_list<State> & affordable_states) : affordable_states(affordable_states) {}
|
||||
|
||||
bool operator() (const std::shared_ptr<const MergeTreeDataPart> & part) const
|
||||
{
|
||||
return part->checkState(affordable_states);
|
||||
}
|
||||
};
|
||||
|
||||
/// Returns a lambda that returns true only for part with states from specified list
|
||||
static inline StatesFilter getStatesFilter(const std::initializer_list<State> & affordable_states)
|
||||
{
|
||||
return StatesFilter(affordable_states);
|
||||
}
|
||||
|
||||
/// Primary key (correspond to primary.idx file).
|
||||
/// Always loaded in RAM. Contains each index_granularity-th value of primary key tuple.
|
||||
/// Note that marks (also correspond to primary key) is not always in RAM, but cached. See MarkCache.h.
|
||||
|
@ -24,8 +24,8 @@ namespace ErrorCodes
|
||||
|
||||
|
||||
ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
|
||||
StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_)
|
||||
: storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_),
|
||||
StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, bool deduplicate_)
|
||||
: storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), deduplicate(deduplicate_),
|
||||
log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)"))
|
||||
{
|
||||
/// The quorum value `1` has the same meaning as if it is disabled.
|
||||
@ -91,6 +91,8 @@ void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKe
|
||||
|
||||
void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
|
||||
{
|
||||
last_block_is_duplicate = false;
|
||||
|
||||
/// TODO Is it possible to not lock the table structure here?
|
||||
storage.data.delayInsertIfNeeded(&storage.restarting_thread->getWakeupEvent());
|
||||
|
||||
@ -115,21 +117,28 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
|
||||
|
||||
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block);
|
||||
|
||||
SipHash hash;
|
||||
part->checksums.summaryDataChecksum(hash);
|
||||
union
|
||||
String block_id;
|
||||
|
||||
if (deduplicate)
|
||||
{
|
||||
char bytes[16];
|
||||
UInt64 words[2];
|
||||
} hash_value;
|
||||
hash.get128(hash_value.bytes);
|
||||
SipHash hash;
|
||||
part->checksums.summaryDataChecksum(hash);
|
||||
union
|
||||
{
|
||||
char bytes[16];
|
||||
UInt64 words[2];
|
||||
} hash_value;
|
||||
hash.get128(hash_value.bytes);
|
||||
|
||||
String checksum(hash_value.bytes, 16);
|
||||
/// We take the hash from the data as ID. That is, do not insert the same data twice.
|
||||
block_id = toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]);
|
||||
|
||||
/// We take the hash from the data as ID. That is, do not insert the same data twice.
|
||||
String block_id = toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]);
|
||||
|
||||
LOG_DEBUG(log, "Wrote block with ID '" << block_id << "', " << block.rows() << " rows");
|
||||
LOG_DEBUG(log, "Wrote block with ID '" << block_id << "', " << block.rows() << " rows");
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Wrote block with " << block.rows() << " rows");
|
||||
}
|
||||
|
||||
commitPart(zookeeper, part, block_id);
|
||||
|
||||
@ -141,6 +150,8 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
|
||||
|
||||
void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
|
||||
{
|
||||
last_block_is_duplicate = false;
|
||||
|
||||
/// NOTE No delay in this case. That's Ok.
|
||||
|
||||
auto zookeeper = storage.getZooKeeper();
|
||||
@ -294,7 +305,9 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
|
||||
{
|
||||
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
|
||||
|
||||
part->is_duplicate = true;
|
||||
transaction.rollback();
|
||||
last_block_is_duplicate = true;
|
||||
}
|
||||
else if (zookeeper->exists(quorum_info.status_path))
|
||||
{
|
||||
|
@ -22,14 +22,20 @@ class StorageReplicatedMergeTree;
|
||||
class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
|
||||
{
|
||||
public:
|
||||
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_,
|
||||
size_t quorum_, size_t quorum_timeout_ms_);
|
||||
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_,
|
||||
bool deduplicate_);
|
||||
|
||||
void write(const Block & block) override;
|
||||
|
||||
/// For ATTACHing existing data on filesystem.
|
||||
void writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
|
||||
|
||||
/// For proper deduplication in MaterializedViews
|
||||
bool lastBlockIsDuplicate() const
|
||||
{
|
||||
return last_block_is_duplicate;
|
||||
}
|
||||
|
||||
private:
|
||||
struct QuorumInfo
|
||||
{
|
||||
@ -49,6 +55,9 @@ private:
|
||||
size_t quorum;
|
||||
size_t quorum_timeout_ms;
|
||||
|
||||
bool deduplicate = true;
|
||||
bool last_block_is_duplicate = false;
|
||||
|
||||
using Logger = Poco::Logger;
|
||||
Logger * log;
|
||||
};
|
||||
|
@ -37,7 +37,7 @@ void ReplicatedMergeTreeCleanupThread::run()
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
|
||||
storage.shutdown_event.tryWait(CLEANUP_SLEEP_MS);
|
||||
storage.cleanup_thread_event.tryWait(CLEANUP_SLEEP_MS);
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Cleanup thread finished");
|
||||
@ -46,7 +46,7 @@ void ReplicatedMergeTreeCleanupThread::run()
|
||||
|
||||
void ReplicatedMergeTreeCleanupThread::iterate()
|
||||
{
|
||||
storage.clearOldPartsAndRemoveFromZK(log);
|
||||
storage.clearOldPartsAndRemoveFromZK();
|
||||
storage.data.clearOldTemporaryDirectories();
|
||||
|
||||
if (storage.is_leader_node)
|
||||
|
@ -367,6 +367,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
|
||||
storage.merge_selecting_event.set();
|
||||
storage.queue_updating_event->set();
|
||||
storage.alter_query_event->set();
|
||||
storage.cleanup_thread_event.set();
|
||||
storage.replica_is_active_node = nullptr;
|
||||
|
||||
LOG_TRACE(log, "Waiting for threads to finish");
|
||||
|
@ -393,8 +393,9 @@ StoragePtr StorageFactory::get(
|
||||
|
||||
if (query.is_materialized_view)
|
||||
{
|
||||
/// Pass local_context here to convey setting for inner table
|
||||
return StorageMaterializedView::create(
|
||||
table_name, database_name, context, query, columns,
|
||||
table_name, database_name, local_context, query, columns,
|
||||
materialized_columns, alias_columns, column_defaults,
|
||||
attach);
|
||||
}
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <Storages/VirtualColumnFactory.h>
|
||||
|
||||
#include <Common/typeid_cast.h>
|
||||
#include "StorageReplicatedMergeTree.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -56,7 +57,7 @@ static void extractDependentTable(const ASTSelectQuery & query, String & select_
|
||||
StorageMaterializedView::StorageMaterializedView(
|
||||
const String & table_name_,
|
||||
const String & database_name_,
|
||||
Context & context_,
|
||||
Context & local_context,
|
||||
const ASTCreateQuery & query,
|
||||
NamesAndTypesListPtr columns_,
|
||||
const NamesAndTypesList & materialized_columns_,
|
||||
@ -64,7 +65,7 @@ StorageMaterializedView::StorageMaterializedView(
|
||||
const ColumnDefaults & column_defaults_,
|
||||
bool attach_)
|
||||
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
|
||||
database_name(database_name_), context(context_), columns(columns_)
|
||||
database_name(database_name_), global_context(local_context.getGlobalContext()), columns(columns_)
|
||||
{
|
||||
if (!query.select)
|
||||
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
|
||||
@ -75,7 +76,7 @@ StorageMaterializedView::StorageMaterializedView(
|
||||
extractDependentTable(*query.select, select_database_name, select_table_name);
|
||||
|
||||
if (!select_table_name.empty())
|
||||
context.getGlobalContext().addDependency(
|
||||
global_context.addDependency(
|
||||
DatabaseAndTableName(select_database_name, select_table_name),
|
||||
DatabaseAndTableName(database_name, table_name));
|
||||
|
||||
@ -95,14 +96,14 @@ StorageMaterializedView::StorageMaterializedView(
|
||||
/// Execute the query.
|
||||
try
|
||||
{
|
||||
InterpreterCreateQuery create_interpreter(manual_create_query, context);
|
||||
InterpreterCreateQuery create_interpreter(manual_create_query, local_context);
|
||||
create_interpreter.execute();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// In case of any error we should remove dependency to the view.
|
||||
if (!select_table_name.empty())
|
||||
context.getGlobalContext().removeDependency(
|
||||
global_context.removeDependency(
|
||||
DatabaseAndTableName(select_database_name, select_table_name),
|
||||
DatabaseAndTableName(database_name, table_name));
|
||||
|
||||
@ -139,20 +140,20 @@ BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const
|
||||
|
||||
void StorageMaterializedView::drop()
|
||||
{
|
||||
context.getGlobalContext().removeDependency(
|
||||
global_context.removeDependency(
|
||||
DatabaseAndTableName(select_database_name, select_table_name),
|
||||
DatabaseAndTableName(database_name, table_name));
|
||||
|
||||
auto inner_table_name = getInnerTableName();
|
||||
|
||||
if (context.tryGetTable(database_name, inner_table_name))
|
||||
if (global_context.tryGetTable(database_name, inner_table_name))
|
||||
{
|
||||
/// We create and execute `drop` query for internal table.
|
||||
auto drop_query = std::make_shared<ASTDropQuery>();
|
||||
drop_query->database = database_name;
|
||||
drop_query->table = inner_table_name;
|
||||
ASTPtr ast_drop_query = drop_query;
|
||||
InterpreterDropQuery drop_interpreter(ast_drop_query, context);
|
||||
InterpreterDropQuery drop_interpreter(ast_drop_query, global_context);
|
||||
drop_interpreter.execute();
|
||||
}
|
||||
}
|
||||
@ -164,7 +165,7 @@ bool StorageMaterializedView::optimize(const ASTPtr & query, const ASTPtr & part
|
||||
|
||||
StoragePtr StorageMaterializedView::getInnerTable() const
|
||||
{
|
||||
return context.getTable(database_name, getInnerTableName());
|
||||
return global_context.getTable(database_name, getInnerTableName());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -51,13 +51,13 @@ private:
|
||||
String table_name;
|
||||
String database_name;
|
||||
ASTPtr inner_query;
|
||||
Context & context;
|
||||
Context & global_context;
|
||||
NamesAndTypesListPtr columns;
|
||||
|
||||
StorageMaterializedView(
|
||||
const String & table_name_,
|
||||
const String & database_name_,
|
||||
Context & context_,
|
||||
Context & local_context,
|
||||
const ASTCreateQuery & query,
|
||||
NamesAndTypesListPtr columns_,
|
||||
const NamesAndTypesList & materialized_columns_,
|
||||
|
@ -482,7 +482,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & query, const ASTPtr & partit
|
||||
if (detach)
|
||||
data.renameAndDetachPart(part, "");
|
||||
else
|
||||
data.replaceParts({part}, {}, false);
|
||||
data.removePartsFromWorkingSet({part}, false);
|
||||
}
|
||||
|
||||
LOG_INFO(log, (detach ? "Detached " : "Removed ") << removed_parts << " parts inside partition ID " << partition_id << ".");
|
||||
|
@ -1468,7 +1468,7 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
|
||||
|
||||
/// If the part needs to be removed, it is more reliable to delete the directory after the changes in ZooKeeper.
|
||||
if (!entry.detach)
|
||||
data.replaceParts({part}, {}, true);
|
||||
data.removePartsFromWorkingSet({part}, true);
|
||||
}
|
||||
|
||||
LOG_INFO(log, (entry.detach ? "Detached " : "Removed ") << removed_parts << " parts inside " << entry.new_part_name << ".");
|
||||
@ -2155,6 +2155,14 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
|
||||
|
||||
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum)
|
||||
{
|
||||
if (auto part = data.getPartIfExists(part_name, {MergeTreeDataPart::State::Outdated, MergeTreeDataPart::State::Deleting}))
|
||||
{
|
||||
LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch");
|
||||
/// Force premature parts cleanup
|
||||
cleanup_thread_event.set();
|
||||
return false;
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(currently_fetching_parts_mutex);
|
||||
if (!currently_fetching_parts.insert(part_name).second)
|
||||
@ -2407,8 +2415,10 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & query, con
|
||||
{
|
||||
assertNotReadonly();
|
||||
|
||||
bool deduplicate = data.settings.replicated_deduplication_window != 0 && settings.insert_deduplicate;
|
||||
|
||||
return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this,
|
||||
settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds());
|
||||
settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds(), deduplicate);
|
||||
}
|
||||
|
||||
|
||||
@ -2832,7 +2842,7 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
|
||||
loaded_parts.push_back(data.loadPartAndFixMetadata(source_dir + part));
|
||||
}
|
||||
|
||||
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0); /// TODO Allow to use quorum here.
|
||||
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, false); /// TODO Allow to use quorum here.
|
||||
for (auto & part : loaded_parts)
|
||||
{
|
||||
String old_name = part->name;
|
||||
@ -3534,10 +3544,10 @@ void StorageReplicatedMergeTree::reshardPartitions(
|
||||
|
||||
/// Make a list of local partitions that need to be resharded.
|
||||
std::set<std::string> unique_partition_list;
|
||||
const MergeTreeData::DataParts & data_parts = data.getDataParts();
|
||||
for (MergeTreeData::DataParts::iterator it = data_parts.cbegin(); it != data_parts.cend(); ++it)
|
||||
auto data_parts = data.getDataParts();
|
||||
for (auto & part : data_parts)
|
||||
{
|
||||
const String & current_partition_id = (*it)->info.partition_id;
|
||||
const String & current_partition_id = part->info.partition_id;
|
||||
if (include_all || partition_id == current_partition_id)
|
||||
unique_partition_list.insert(current_partition_id);
|
||||
}
|
||||
@ -3853,64 +3863,99 @@ bool StorageReplicatedMergeTree::checkSpaceForResharding(const ReplicaToSpaceInf
|
||||
}
|
||||
|
||||
|
||||
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK(Logger * log_)
|
||||
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
|
||||
{
|
||||
/// Critical section is not required (since grabOldParts() returns unique part set on each call)
|
||||
|
||||
Logger * log = log_ ? log_ : this->log;
|
||||
|
||||
auto table_lock = lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto zookeeper = getZooKeeper();
|
||||
|
||||
MergeTreeData::DataPartsVector parts = data.grabOldParts();
|
||||
size_t count = parts.size();
|
||||
|
||||
if (!count)
|
||||
if (parts.empty())
|
||||
return;
|
||||
|
||||
/// Part names that were successfully deleted from filesystem and should be deleted from ZooKeeper
|
||||
Strings part_names;
|
||||
auto remove_from_zookeeper = [&] ()
|
||||
{
|
||||
LOG_DEBUG(log, "Removed " << part_names.size() << " old parts from filesystem. Removing them from ZooKeeper.");
|
||||
MergeTreeData::DataPartsVector parts_to_delete_only_from_filesystem; // Only duplicates
|
||||
MergeTreeData::DataPartsVector parts_to_delete_completely; // All parts except duplicates
|
||||
MergeTreeData::DataPartsVector parts_to_retry_deletion; // Parts that should be retried due to network problems
|
||||
MergeTreeData::DataPartsVector parts_to_remove_from_filesystem; // Parts removed from ZK
|
||||
|
||||
try
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
if (!part->is_duplicate)
|
||||
parts_to_delete_completely.emplace_back(part);
|
||||
else
|
||||
parts_to_delete_only_from_filesystem.emplace_back(part);
|
||||
}
|
||||
parts.clear();
|
||||
|
||||
auto remove_parts_from_filesystem = [log=log] (const MergeTreeData::DataPartsVector & parts_to_remove)
|
||||
{
|
||||
for (auto & part : parts_to_remove)
|
||||
{
|
||||
removePartsFromZooKeeper(zookeeper, part_names);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
LOG_ERROR(log, "There is a problem with deleting parts from ZooKeeper: " << getCurrentExceptionMessage(false));
|
||||
try
|
||||
{
|
||||
part->remove();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log, "There is a problem with deleting part " + part->name + " from filesystem");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// Delete duplicate parts from filesystem
|
||||
if (!parts_to_delete_only_from_filesystem.empty())
|
||||
{
|
||||
remove_parts_from_filesystem(parts_to_delete_only_from_filesystem);
|
||||
data.removePartsFinally(parts_to_delete_only_from_filesystem);
|
||||
|
||||
LOG_DEBUG(log, "Removed " << parts_to_delete_only_from_filesystem.size() << " old duplicate parts");
|
||||
}
|
||||
|
||||
/// Delete normal parts from ZooKeeper
|
||||
NameSet part_names_to_retry_deletion;
|
||||
try
|
||||
{
|
||||
LOG_DEBUG(log, "Removing " << parts.size() << " old parts from filesystem");
|
||||
Strings part_names_to_delete_completely;
|
||||
for (const auto & part : parts_to_delete_completely)
|
||||
part_names_to_delete_completely.emplace_back(part->name);
|
||||
|
||||
while (!parts.empty())
|
||||
{
|
||||
MergeTreeData::DataPartPtr & part = parts.back();
|
||||
part->remove();
|
||||
part_names.emplace_back(part->name);
|
||||
parts.pop_back();
|
||||
}
|
||||
LOG_DEBUG(log, "Removing " << parts_to_delete_completely.size() << " old parts from ZooKeeper");
|
||||
removePartsFromZooKeeper(zookeeper, part_names_to_delete_completely, &part_names_to_retry_deletion);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
|
||||
/// Finalize deletion of parts already deleted from filesystem, rollback remaining parts
|
||||
data.addOldParts(parts);
|
||||
remove_from_zookeeper();
|
||||
|
||||
throw;
|
||||
LOG_ERROR(log, "There is a problem with deleting parts from ZooKeeper: " << getCurrentExceptionMessage(false));
|
||||
}
|
||||
|
||||
/// Finalize deletion
|
||||
remove_from_zookeeper();
|
||||
/// Part names that were reliably deleted from ZooKeeper should be deleted from filesystem
|
||||
auto num_reliably_deleted_parts = parts_to_delete_completely.size() - part_names_to_retry_deletion.size();
|
||||
LOG_DEBUG(log, "Removed " << num_reliably_deleted_parts << " old parts from ZooKeeper. Removing them from filesystem.");
|
||||
|
||||
LOG_DEBUG(log, "Removed " << count << " old parts");
|
||||
/// Delete normal parts on two sets
|
||||
for (auto & part : parts_to_delete_completely)
|
||||
{
|
||||
if (part_names_to_retry_deletion.count(part->name) == 0)
|
||||
parts_to_remove_from_filesystem.emplace_back(part);
|
||||
else
|
||||
parts_to_retry_deletion.emplace_back(part);
|
||||
}
|
||||
|
||||
/// Will retry deletion
|
||||
if (!parts_to_retry_deletion.empty())
|
||||
{
|
||||
data.rollbackDeletingParts(parts_to_retry_deletion);
|
||||
LOG_DEBUG(log, "Will retry deletion of " << parts_to_retry_deletion.size() << " parts in the next time");
|
||||
}
|
||||
|
||||
/// Remove parts from filesystem and finally from data_parts
|
||||
if (!parts_to_remove_from_filesystem.empty())
|
||||
{
|
||||
remove_parts_from_filesystem(parts_to_remove_from_filesystem);
|
||||
data.removePartsFinally(parts_to_remove_from_filesystem);
|
||||
|
||||
LOG_DEBUG(log, "Removed " << parts_to_remove_from_filesystem.size() << " old parts");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -3930,7 +3975,8 @@ static int32_t tryMultiWithRetries(zkutil::ZooKeeperPtr & zookeeper, zkutil::Ops
|
||||
}
|
||||
|
||||
|
||||
void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names)
|
||||
void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
|
||||
NameSet * parts_should_be_retied)
|
||||
{
|
||||
zkutil::Ops ops;
|
||||
auto it_first_node_in_batch = part_names.cbegin();
|
||||
@ -3958,11 +4004,24 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr &
|
||||
auto cur_code = tryMultiWithRetries(zookeeper, cur_ops);
|
||||
|
||||
if (cur_code == ZNONODE)
|
||||
{
|
||||
LOG_DEBUG(log, "There is no part " << *it_in_batch << " in ZooKeeper, it was only in filesystem");
|
||||
}
|
||||
else if (parts_should_be_retied && zkutil::isHardwareErrorCode(cur_code))
|
||||
{
|
||||
parts_should_be_retied->emplace(*it_in_batch);
|
||||
}
|
||||
else if (cur_code != ZOK)
|
||||
{
|
||||
LOG_WARNING(log, "Cannot remove part " << *it_in_batch << " from ZooKeeper: " << ::zerror(cur_code));
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (parts_should_be_retied && zkutil::isHardwareErrorCode(code))
|
||||
{
|
||||
for (auto it_in_batch = it_first_node_in_batch; it_in_batch != it_next; ++it_in_batch)
|
||||
parts_should_be_retied->emplace(*it_in_batch);
|
||||
}
|
||||
else if (code != ZOK)
|
||||
{
|
||||
LOG_WARNING(log, "There was a problem with deleting " << (it_next - it_first_node_in_batch)
|
||||
|
@ -205,7 +205,7 @@ public:
|
||||
|
||||
private:
|
||||
/// Delete old chunks from disk and from ZooKeeper.
|
||||
void clearOldPartsAndRemoveFromZK(Logger * log_ = nullptr);
|
||||
void clearOldPartsAndRemoveFromZK();
|
||||
|
||||
friend class ReplicatedMergeTreeBlockOutputStream;
|
||||
friend class ReplicatedMergeTreeRestartingThread;
|
||||
@ -307,6 +307,8 @@ private:
|
||||
|
||||
/// A thread that removes old parts, log entries, and blocks.
|
||||
std::unique_ptr<ReplicatedMergeTreeCleanupThread> cleanup_thread;
|
||||
/// Is used to wakeup cleanup_thread
|
||||
Poco::Event cleanup_thread_event;
|
||||
|
||||
/// A thread that processes reconnection to ZooKeeper when the session expires.
|
||||
std::unique_ptr<ReplicatedMergeTreeRestartingThread> restarting_thread;
|
||||
@ -380,7 +382,8 @@ private:
|
||||
void removePartFromZooKeeper(const String & part_name, zkutil::Ops & ops);
|
||||
|
||||
/// Quickly removes big set of parts from ZooKeeper (using async multi queries)
|
||||
void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names);
|
||||
void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
|
||||
NameSet * parts_should_be_retied = nullptr);
|
||||
|
||||
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
|
||||
void removePartAndEnqueueFetch(const String & part_name);
|
||||
|
@ -174,33 +174,41 @@ BlockInputStreams StorageSystemParts::read(
|
||||
*/
|
||||
if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
|
||||
continue;
|
||||
else
|
||||
throw;
|
||||
|
||||
throw;
|
||||
}
|
||||
|
||||
String engine = storage->getName();
|
||||
|
||||
MergeTreeData * data = nullptr;
|
||||
|
||||
if (StorageMergeTree * merge_tree = dynamic_cast<StorageMergeTree *>(&*storage))
|
||||
if (auto merge_tree = dynamic_cast<StorageMergeTree *>(&*storage))
|
||||
{
|
||||
data = &merge_tree->getData();
|
||||
}
|
||||
else if (StorageReplicatedMergeTree * replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(&*storage))
|
||||
else if (auto replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(&*storage))
|
||||
{
|
||||
data = &replicated_merge_tree->getData();
|
||||
}
|
||||
|
||||
MergeTreeData::DataParts active_parts = data->getDataParts();
|
||||
MergeTreeData::DataParts all_parts;
|
||||
if (need[0])
|
||||
all_parts = data->getAllDataParts();
|
||||
else
|
||||
all_parts = active_parts;
|
||||
{
|
||||
throw Exception("Unknown engine " + engine, ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
using State = MergeTreeDataPart::State;
|
||||
MergeTreeData::DataPartStateVector all_parts_state;
|
||||
MergeTreeData::DataPartsVector all_parts;
|
||||
if (need[0])
|
||||
all_parts = data->getDataPartsVector({State::Committed, State::Outdated}, all_parts_state);
|
||||
else
|
||||
all_parts = data->getDataPartsVector({State::Committed}, all_parts_state);
|
||||
|
||||
/// Finally, we'll go through the list of parts.
|
||||
for (const MergeTreeData::DataPartPtr & part : all_parts)
|
||||
for (size_t part_number = 0; part_number < all_parts.size(); ++part_number)
|
||||
{
|
||||
const auto & part = all_parts[part_number];
|
||||
auto part_state = all_parts_state[part_number];
|
||||
|
||||
size_t i = 0;
|
||||
{
|
||||
WriteBufferFromOwnString out;
|
||||
@ -208,7 +216,7 @@ BlockInputStreams StorageSystemParts::read(
|
||||
block.getByPosition(i++).column->insert(out.str());
|
||||
}
|
||||
block.getByPosition(i++).column->insert(part->name);
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(active_parts.count(part)));
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(part_state == State::Committed));
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->marks_count));
|
||||
|
||||
size_t marks_size = 0;
|
||||
@ -227,7 +235,7 @@ BlockInputStreams StorageSystemParts::read(
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->remove_time));
|
||||
|
||||
/// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts.
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(part.use_count() - (active_parts.count(part) ? 2 : 1)));
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(part.use_count() - 1));
|
||||
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->getMinDate()));
|
||||
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->getMaxDate()));
|
||||
|
45
dbms/src/Storages/tests/gtest_range_filtered.cpp.cpp
Normal file
45
dbms/src/Storages/tests/gtest_range_filtered.cpp.cpp
Normal file
@ -0,0 +1,45 @@
|
||||
#include <gtest/gtest.h>
|
||||
#include <common/RangeFiltered.h>
|
||||
#include <vector>
|
||||
#include <set>
|
||||
|
||||
|
||||
TEST(RangeFiltered, simple)
|
||||
{
|
||||
std::vector<int> v;
|
||||
|
||||
for (int i = 0; i < 10; ++i)
|
||||
v.push_back(i);
|
||||
|
||||
auto v30 = createRangeFiltered([] (int i) { return i % 3 == 0;}, v);
|
||||
auto v31 = createRangeFiltered([] (int i) { return i % 3 != 0;}, v);
|
||||
|
||||
for (const int & i : v30)
|
||||
ASSERT_EQ(i % 3, 0);
|
||||
|
||||
for (const int & i : v31)
|
||||
ASSERT_NE(i % 3, 0);
|
||||
|
||||
{
|
||||
auto it = v30.begin();
|
||||
ASSERT_EQ(*it, 0);
|
||||
|
||||
auto it2 = std::next(it);
|
||||
ASSERT_EQ(*it2, 3);
|
||||
|
||||
auto it3 = it;
|
||||
it = std::next(it2);
|
||||
ASSERT_EQ(*it, 6);
|
||||
}
|
||||
|
||||
{
|
||||
auto it = std::next(v30.begin());
|
||||
ASSERT_EQ(*it, 3);
|
||||
|
||||
*it = 2; /// it becomes invalid
|
||||
ASSERT_EQ(*(++it), 6); /// but iteration is sucessfull
|
||||
|
||||
*v30.begin() = 1;
|
||||
ASSERT_EQ(*v30.begin(), 6);
|
||||
}
|
||||
}
|
@ -5,7 +5,7 @@ export PATH=$PATH:/home/milovidov/cov-analysis-linux64-2017.07/bin
|
||||
|
||||
mkdir ClickHouse_coverity
|
||||
cd ClickHouse_coverity
|
||||
git clone git@github.com:yandex/ClickHouse.git .
|
||||
git clone --recursive git@github.com:yandex/ClickHouse.git .
|
||||
|
||||
mkdir build
|
||||
cd build
|
||||
|
@ -3,5 +3,6 @@
|
||||
<replicated_deduplication_window>999999999</replicated_deduplication_window>
|
||||
<replicated_deduplication_window_seconds>1</replicated_deduplication_window_seconds>
|
||||
<cleanup_delay_period>1</cleanup_delay_period>
|
||||
<old_parts_lifetime>1</old_parts_lifetime>
|
||||
</merge_tree>
|
||||
</yandex>
|
||||
|
@ -7,31 +7,19 @@ from helpers.cluster import ClickHouseCluster
|
||||
from helpers.network import PartitionManager
|
||||
from helpers.test_tools import TSV
|
||||
from helpers.client import CommandRequest
|
||||
from helpers.client import QueryTimeoutExceedException
|
||||
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node1 = cluster.add_instance('node1', config_dir='configs', with_zookeeper=True)
|
||||
node2 = cluster.add_instance('node2', config_dir='configs', with_zookeeper=True)
|
||||
node1 = cluster.add_instance('node1', config_dir='configs', with_zookeeper=True, macroses={"layer": 0, "shard": 0, "replica": 1})
|
||||
node2 = cluster.add_instance('node2', config_dir='configs', with_zookeeper=True, macroses={"layer": 0, "shard": 0, "replica": 2})
|
||||
nodes = [node1, node2]
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
for node in nodes:
|
||||
node.query('''
|
||||
CREATE TABLE simple (date Date, id UInt32)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
|
||||
'''.format(replica=node.name))
|
||||
node.query("INSERT INTO simple VALUES (0, 0)")
|
||||
|
||||
node.query('''
|
||||
CREATE TABLE simple2 (date Date, id UInt32)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple2', '{replica}', date, id, 8192);
|
||||
'''.format(replica=node.name))
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
@ -42,24 +30,24 @@ ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple2', '{replica}', date,
|
||||
def test_deduplication_window_in_seconds(started_cluster):
|
||||
node = node1
|
||||
|
||||
node.query("INSERT INTO simple2 VALUES (0, 0)")
|
||||
node1.query("""
|
||||
CREATE TABLE simple ON CLUSTER test_cluster (date Date, id UInt32)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/simple', '{replica}', date, id, 8192)""")
|
||||
|
||||
node.query("INSERT INTO simple VALUES (0, 0)")
|
||||
time.sleep(1)
|
||||
node.query("INSERT INTO simple2 VALUES (0, 0)") # deduplication works here
|
||||
node.query("INSERT INTO simple2 VALUES (0, 1)")
|
||||
assert TSV(node.query("SELECT count() FROM simple2")) == TSV("2\n")
|
||||
node.query("INSERT INTO simple VALUES (0, 0)") # deduplication works here
|
||||
node.query("INSERT INTO simple VALUES (0, 1)")
|
||||
assert TSV(node.query("SELECT count() FROM simple")) == TSV("2\n")
|
||||
|
||||
# wait clean thread
|
||||
time.sleep(2)
|
||||
|
||||
assert TSV.toMat(node.query("SELECT count() FROM system.zookeeper WHERE path='/clickhouse/tables/0/simple2/blocks'"))[0][0] == "1"
|
||||
node.query("INSERT INTO simple2 VALUES (0, 0)") # deduplication doesn't works here, the first hash node was deleted
|
||||
assert TSV.toMat(node.query("SELECT count() FROM simple2"))[0][0] == "3"
|
||||
assert TSV.toMat(node.query("SELECT count() FROM system.zookeeper WHERE path='/clickhouse/tables/0/simple/blocks'"))[0][0] == "1"
|
||||
node.query("INSERT INTO simple VALUES (0, 0)") # deduplication doesn't works here, the first hash node was deleted
|
||||
assert TSV.toMat(node.query("SELECT count() FROM simple"))[0][0] == "3"
|
||||
|
||||
|
||||
def check_timeout_exception(e):
|
||||
s = str(e)
|
||||
#print s
|
||||
assert s.find('timed out!') >= 0 or s.find('Return code: -9') >= 0
|
||||
node1.query("""DROP TABLE simple ON CLUSTER test_cluster""")
|
||||
|
||||
|
||||
# Currently this test just reproduce incorrect behavior that sould be fixed
|
||||
@ -67,6 +55,12 @@ def test_deduplication_works_in_case_of_intensive_inserts(started_cluster):
|
||||
inserters = []
|
||||
fetchers = []
|
||||
|
||||
node1.query("""
|
||||
CREATE TABLE simple ON CLUSTER test_cluster (date Date, id UInt32)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/simple', '{replica}', date, id, 8192)""")
|
||||
|
||||
node1.query("INSERT INTO simple VALUES (0, 0)")
|
||||
|
||||
for node in nodes:
|
||||
host = node.ip_address
|
||||
|
||||
@ -81,7 +75,7 @@ done
|
||||
set -e
|
||||
for i in `seq 1000`; do
|
||||
res=`clickhouse-client --host {} -q "SELECT count() FROM simple"`
|
||||
if [[ $res -ne 1 ]]; then
|
||||
if [[ $? -ne 0 || $res -ne 1 ]]; then
|
||||
echo "Selected $res elements! Host: {}" 1>&2
|
||||
exit -1
|
||||
fi;
|
||||
@ -92,14 +86,16 @@ done
|
||||
for inserter in inserters:
|
||||
try:
|
||||
inserter.get_answer()
|
||||
except Exception as e:
|
||||
check_timeout_exception(e)
|
||||
except QueryTimeoutExceedException:
|
||||
# Only timeout is accepted
|
||||
pass
|
||||
|
||||
# There were not errors during SELECTs
|
||||
for fetcher in fetchers:
|
||||
try:
|
||||
fetcher.get_answer()
|
||||
except Exception as e:
|
||||
except QueryTimeoutExceedException:
|
||||
# Only timeout is accepted
|
||||
pass
|
||||
# Uncomment when problem will be fixed
|
||||
# check_timeout_exception(e)
|
||||
|
||||
node1.query("""DROP TABLE simple ON CLUSTER test_cluster""")
|
||||
|
@ -0,0 +1,8 @@
|
||||
<yandex>
|
||||
<merge_tree>
|
||||
<replicated_deduplication_window>999999999</replicated_deduplication_window>
|
||||
<replicated_deduplication_window_seconds>999999999</replicated_deduplication_window_seconds>
|
||||
<cleanup_delay_period>10</cleanup_delay_period>
|
||||
<old_parts_lifetime>1</old_parts_lifetime>
|
||||
</merge_tree>
|
||||
</yandex>
|
@ -0,0 +1,17 @@
|
||||
<yandex>
|
||||
<remote_servers>
|
||||
<test_cluster>
|
||||
<shard>
|
||||
<internal_replication>true</internal_replication>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>node2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_cluster>
|
||||
</remote_servers>
|
||||
</yandex>
|
61
dbms/tests/integration/test_random_inserts/test.py
Normal file
61
dbms/tests/integration/test_random_inserts/test.py
Normal file
@ -0,0 +1,61 @@
|
||||
import time
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
|
||||
import pytest
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.network import PartitionManager
|
||||
from helpers.test_tools import TSV
|
||||
from helpers.client import CommandRequest
|
||||
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node1 = cluster.add_instance('node1', config_dir='configs', with_zookeeper=True, macroses={"layer": 0, "shard": 0, "replica": 1})
|
||||
node2 = cluster.add_instance('node2', config_dir='configs', with_zookeeper=True, macroses={"layer": 0, "shard": 0, "replica": 2})
|
||||
nodes = [node1, node2]
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
pass
|
||||
cluster.shutdown()
|
||||
|
||||
def test_random_inserts(started_cluster):
|
||||
# Duration of the test, reduce it if don't want to wait
|
||||
DURATION_SECONDS = 10# * 60
|
||||
|
||||
node1.query("""
|
||||
CREATE TABLE simple ON CLUSTER test_cluster (date Date, i UInt32, s String)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/simple', '{replica}', date, i, 8192)""")
|
||||
|
||||
with PartitionManager() as pm_random_drops:
|
||||
for sacrifice in nodes:
|
||||
pass # This test doesn't work with partition problems still
|
||||
#pm_random_drops._add_rule({'probability': 0.01, 'destination': sacrifice.ip_address, 'source_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
|
||||
#pm_random_drops._add_rule({'probability': 0.01, 'source': sacrifice.ip_address, 'destination_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
|
||||
|
||||
min_timestamp = int(time.time())
|
||||
max_timestamp = min_timestamp + DURATION_SECONDS
|
||||
num_timestamps = max_timestamp - min_timestamp + 1
|
||||
|
||||
bash_script = os.path.join(os.path.dirname(__file__), "test.sh")
|
||||
inserters = []
|
||||
for node in nodes:
|
||||
cmd = ['/bin/bash', bash_script, node.ip_address, str(min_timestamp), str(max_timestamp)]
|
||||
inserters.append(CommandRequest(cmd, timeout=DURATION_SECONDS * 2, stdin=''))
|
||||
print node.name, node.ip_address
|
||||
|
||||
for inserter in inserters:
|
||||
inserter.get_answer()
|
||||
|
||||
answer="{}\t{}\t{}\t{}\n".format(num_timestamps, num_timestamps, min_timestamp, max_timestamp)
|
||||
for node in nodes:
|
||||
assert TSV(node.query("SELECT count(), uniqExact(i), min(i), max(i) FROM simple")) == TSV(answer), node.name + " : " + node.query("SELECT groupArray(_part), i, count() AS c FROM simple GROUP BY i ORDER BY c DESC LIMIT 1")
|
||||
|
||||
node1.query("""DROP TABLE simple ON CLUSTER test_cluster""")
|
45
dbms/tests/integration/test_random_inserts/test.sh
Executable file
45
dbms/tests/integration/test_random_inserts/test.sh
Executable file
@ -0,0 +1,45 @@
|
||||
#!/bin/bash
|
||||
#set -e
|
||||
|
||||
[[ -n "$1" ]] && host="$1" || host="127.0.0.1"
|
||||
[[ -n "$2" ]] && min_timestamp="$2" || min_timestamp=$(( $(date +%s) - 10 ))
|
||||
[[ -n "$3" ]] && max_timestamp="$3" || max_timestamp=$(( $(date +%s) + 10 ))
|
||||
|
||||
timestamps=`seq $min_timestamp $max_timestamp`
|
||||
|
||||
function reliable_insert {
|
||||
local ts="$1"
|
||||
num_tries=0
|
||||
while true; do
|
||||
if (( $num_tries > 20 )); then
|
||||
echo "Too many retries" 1>&2
|
||||
exit -1
|
||||
fi
|
||||
|
||||
#echo clickhouse-client --host $host -q "INSERT INTO simple VALUES (0, $ts, '$ts')"
|
||||
res=`clickhouse-client --host $host -q "INSERT INTO simple VALUES (0, $ts, '$ts')" 2>&1`
|
||||
rt=$?
|
||||
num_tries=$(($num_tries+1))
|
||||
|
||||
if (( $rt == 0 )); then break; fi
|
||||
if [[ $res == *"Code: 319. "*"Unknown status, client must retry"* || $res == *"Code: 999. "* ]]; then
|
||||
continue
|
||||
else
|
||||
echo FAIL "$res" 1>&2
|
||||
exit -1
|
||||
fi
|
||||
done;
|
||||
}
|
||||
|
||||
for i in $timestamps; do
|
||||
|
||||
cur_timestamp=$(date +%s)
|
||||
while (( $cur_timestamp < $i )); do
|
||||
ts=`shuf -i $min_timestamp-$cur_timestamp -n 1`
|
||||
reliable_insert "$ts"
|
||||
cur_timestamp=$(date +%s)
|
||||
done
|
||||
|
||||
#echo $i >> $host".txt"
|
||||
reliable_insert "$i"
|
||||
done
|
@ -1,2 +1,3 @@
|
||||
1
|
||||
1
|
||||
\N
|
||||
|
@ -1,2 +1,3 @@
|
||||
SELECT CAST(1 AS Nullable(UInt8)) AS id WHERE id = CAST(1 AS Nullable(UInt8));
|
||||
SELECT CAST(1 AS Nullable(UInt8)) AS id WHERE id = 1;
|
||||
SELECT NULL == CAST(toUInt8(0) AS Nullable(UInt8));
|
||||
|
@ -0,0 +1,8 @@
|
||||
2
|
||||
3
|
||||
|
||||
2
|
||||
3
|
||||
|
||||
1
|
||||
1
|
@ -0,0 +1,51 @@
|
||||
SET experimental_allow_extended_storage_definition_syntax=1;
|
||||
|
||||
DROP TABLE IF EXISTS test.with_deduplication;
|
||||
DROP TABLE IF EXISTS test.without_deduplication;
|
||||
DROP TABLE IF EXISTS test.with_deduplication_mv;
|
||||
DROP TABLE IF EXISTS test.without_deduplication_mv;
|
||||
|
||||
CREATE TABLE test.with_deduplication(x UInt32)
|
||||
ENGINE ReplicatedMergeTree('/clickhouse/tables/test/with_deduplication', 'r1') ORDER BY x;
|
||||
CREATE TABLE test.without_deduplication(x UInt32)
|
||||
ENGINE ReplicatedMergeTree('/clickhouse/tables/test/without_deduplication', 'r1') ORDER BY x SETTINGS replicated_deduplication_window = 0;
|
||||
|
||||
CREATE MATERIALIZED VIEW test.with_deduplication_mv
|
||||
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/test/with_deduplication_mv', 'r1') ORDER BY dummy
|
||||
AS SELECT 0 AS dummy, countState(x) AS cnt FROM test.with_deduplication;
|
||||
CREATE MATERIALIZED VIEW test.without_deduplication_mv
|
||||
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/test/without_deduplication_mv', 'r1') ORDER BY dummy
|
||||
AS SELECT 0 AS dummy, countState(x) AS cnt FROM test.without_deduplication;
|
||||
|
||||
INSERT INTO test.with_deduplication VALUES (42);
|
||||
INSERT INTO test.with_deduplication VALUES (42);
|
||||
INSERT INTO test.with_deduplication VALUES (43);
|
||||
|
||||
INSERT INTO test.without_deduplication VALUES (42);
|
||||
INSERT INTO test.without_deduplication VALUES (42);
|
||||
INSERT INTO test.without_deduplication VALUES (43);
|
||||
|
||||
SELECT count() FROM test.with_deduplication;
|
||||
SELECT count() FROM test.without_deduplication;
|
||||
|
||||
-- Implicit insert isn't deduplicated
|
||||
SELECT '';
|
||||
SELECT countMerge(cnt) FROM test.with_deduplication_mv;
|
||||
SELECT countMerge(cnt) FROM test.without_deduplication_mv;
|
||||
|
||||
-- Explicit insert is deduplicated
|
||||
ALTER TABLE test.`.inner.with_deduplication_mv` DROP PARTITION ID 'all';
|
||||
ALTER TABLE test.`.inner.without_deduplication_mv` DROP PARTITION ID 'all';
|
||||
INSERT INTO test.`.inner.with_deduplication_mv` SELECT 0 AS dummy, arrayReduce('countState', [toUInt32(42)]) AS cnt;
|
||||
INSERT INTO test.`.inner.with_deduplication_mv` SELECT 0 AS dummy, arrayReduce('countState', [toUInt32(42)]) AS cnt;
|
||||
INSERT INTO test.`.inner.without_deduplication_mv` SELECT 0 AS dummy, arrayReduce('countState', [toUInt32(42)]) AS cnt;
|
||||
INSERT INTO test.`.inner.without_deduplication_mv` SELECT 0 AS dummy, arrayReduce('countState', [toUInt32(42)]) AS cnt;
|
||||
|
||||
SELECT '';
|
||||
SELECT countMerge(cnt) FROM test.with_deduplication_mv;
|
||||
SELECT countMerge(cnt) FROM test.without_deduplication_mv;
|
||||
|
||||
DROP TABLE IF EXISTS test.with_deduplication;
|
||||
DROP TABLE IF EXISTS test.without_deduplication;
|
||||
DROP TABLE IF EXISTS test.with_deduplication_mv;
|
||||
DROP TABLE IF EXISTS test.without_deduplication_mv;
|
21
docs/en/formats/capnproto.rst
Normal file
21
docs/en/formats/capnproto.rst
Normal file
@ -0,0 +1,21 @@
|
||||
CapnProto
|
||||
---------
|
||||
|
||||
Cap'n Proto is a binary message format. Like Protocol Buffers and Thrift (but unlike JSON or MessagePack), Cap'n Proto messages are strongly-typed and not self-describing. Due to this, it requires a ``schema`` setting to specify schema file and the root object. The schema is parsed on runtime and cached for each SQL statement.
|
||||
|
||||
.. code-block:: sql
|
||||
|
||||
SELECT SearchPhrase, count() AS c FROM test.hits GROUP BY SearchPhrase FORMAT CapnProto SETTINGS schema = 'schema.capnp:Message'
|
||||
|
||||
When the schema file looks like:
|
||||
|
||||
.. code-block:: text
|
||||
|
||||
struct Message {
|
||||
SearchPhrase @0 :Text;
|
||||
c @1 :Uint64;
|
||||
}
|
||||
|
||||
Deserialization is almost as efficient as the binary rows format, with typically zero allocation overhead per message.
|
||||
|
||||
You can use this format as an efficient exchange message format in your data processing pipeline.
|
125
libs/libcommon/include/common/RangeFiltered.h
Normal file
125
libs/libcommon/include/common/RangeFiltered.h
Normal file
@ -0,0 +1,125 @@
|
||||
#pragma once
|
||||
#include <type_traits>
|
||||
|
||||
|
||||
/// Similar to boost::filtered_range but a little bit easier and allows to convert ordinary iterators to filtered
|
||||
template <typename F, typename C>
|
||||
struct RangeFiltered
|
||||
{
|
||||
using RawIterator = typename C:: iterator;
|
||||
class Iterator;
|
||||
|
||||
/// Will iterate over elements for which filter(*it) == true
|
||||
RangeFiltered(F && filter, const C & container)
|
||||
: filter(std::move(filter)), container(container) {}
|
||||
|
||||
Iterator begin() const
|
||||
{
|
||||
return {*this, std::begin(container)};
|
||||
}
|
||||
|
||||
Iterator end() const
|
||||
{
|
||||
return {*this, std::end(container)};
|
||||
}
|
||||
|
||||
/// Convert ordinary iterator to filtered one
|
||||
/// Real position will be in range [ordinary_iterator; end()], so it is suitable to use with lower[upper]_bound()
|
||||
inline Iterator convert(RawIterator ordinary_iterator) const
|
||||
{
|
||||
return {*this, ordinary_iterator};
|
||||
}
|
||||
|
||||
|
||||
/// It is similar to boost::filtered_iterator, but has additional features:
|
||||
/// it doesn't store end() iterator
|
||||
/// it doesn't store predicate, so it allows to implement operator=()
|
||||
/// it guarantees that operator++() works properly in case of filter(*it) == false
|
||||
class Iterator
|
||||
{
|
||||
public:
|
||||
using Range = RangeFiltered<F, C>;
|
||||
|
||||
typedef Iterator self_type;
|
||||
typedef typename std::iterator_traits<RawIterator>::value_type value_type;
|
||||
typedef typename std::iterator_traits<RawIterator>::reference reference;
|
||||
typedef const value_type & const_reference;
|
||||
typedef typename std::iterator_traits<RawIterator>::pointer pointer;
|
||||
typedef const value_type * const_pointer;
|
||||
typedef typename std::iterator_traits<RawIterator>::difference_type difference_type;
|
||||
typedef std::bidirectional_iterator_tag iterator_category;
|
||||
|
||||
Iterator(const Range & range_, RawIterator iter_)
|
||||
: range(&range_), iter(iter_)
|
||||
{
|
||||
for (; iter != std::end(range->container) && !range->filter(*iter); ++iter);
|
||||
}
|
||||
|
||||
Iterator(const Iterator & rhs) = default;
|
||||
Iterator(Iterator && rhs) noexcept = default;
|
||||
|
||||
Iterator operator++()
|
||||
{
|
||||
++iter;
|
||||
for (; iter != std::end(range->container) && !range->filter(*iter); ++iter);
|
||||
return *this;
|
||||
}
|
||||
|
||||
Iterator operator--()
|
||||
{
|
||||
--iter;
|
||||
for (; !range->filter(*iter); --iter); /// Don't check std::begin() bound
|
||||
return *this;
|
||||
}
|
||||
|
||||
pointer operator->()
|
||||
{
|
||||
return iter.operator->();
|
||||
}
|
||||
|
||||
const_pointer operator->() const
|
||||
{
|
||||
return iter.operator->();
|
||||
}
|
||||
|
||||
reference operator*()
|
||||
{
|
||||
return *iter;
|
||||
}
|
||||
|
||||
const_reference operator*() const
|
||||
{
|
||||
return *iter;
|
||||
}
|
||||
|
||||
bool operator==(const self_type & rhs) const
|
||||
{
|
||||
return iter == rhs.iter;
|
||||
}
|
||||
|
||||
bool operator!=(const self_type & rhs) const
|
||||
{
|
||||
return iter != rhs.iter;
|
||||
}
|
||||
|
||||
self_type & operator=(const self_type & rhs) = default;
|
||||
self_type & operator=(self_type && rhs) noexcept = default;
|
||||
|
||||
~Iterator() = default;
|
||||
|
||||
private:
|
||||
const Range * range = nullptr;
|
||||
RawIterator iter;
|
||||
};
|
||||
|
||||
protected:
|
||||
F filter;
|
||||
const C & container;
|
||||
};
|
||||
|
||||
|
||||
template <typename F, typename C>
|
||||
inline RangeFiltered<std::decay_t<F>, std::decay_t<C>> createRangeFiltered(F && filter, C && container)
|
||||
{
|
||||
return {std::forward<F>(filter), std::forward<C>(container)};
|
||||
};
|
Loading…
Reference in New Issue
Block a user