This commit is contained in:
Evgeniy Gatov 2014-11-10 17:45:10 +03:00
commit 741c1a822d
86 changed files with 2729 additions and 357 deletions

View File

@ -46,7 +46,7 @@ struct HashMapCell
static const Key & getKey(const value_type & value) { return value.first; }
bool keyEquals(const Key & key_) const { return value.first == key_; }
bool keyEquals(const HashMapCell & other) const { return value.first == other.value.first; }
bool keyEquals(const Key & key_, size_t hash_) const { return value.first == key_; }
void setHash(size_t hash_value) {}
size_t getHash(const Hash & hash) const { return hash(value.first); }
@ -107,7 +107,7 @@ struct HashMapCellWithSavedHash : public HashMapCell<Key, TMapped, Hash, TState>
HashMapCellWithSavedHash(const typename Base::value_type & value_, const typename Base::State & state) : Base(value_, state) {}
bool keyEquals(const Key & key_) const { return this->value.first == key_; }
bool keyEquals(const HashMapCellWithSavedHash & other) const { return saved_hash == other.saved_hash && this->value.first == other.value.first; }
bool keyEquals(const Key & key_, size_t hash_) const { return saved_hash == hash_ && this->value.first == key_; }
void setHash(size_t hash_value) { saved_hash = hash_value; }
size_t getHash(const Hash & hash) const { return saved_hash; }

View File

@ -69,7 +69,7 @@ struct HashSetCellWithSavedHash : public HashTableCell<Key, Hash, TState>
HashSetCellWithSavedHash(const Key & key_, const typename Base::State & state) : Base(key_, state) {}
bool keyEquals(const Key & key_) const { return this->key == key_; }
bool keyEquals(const HashSetCellWithSavedHash & other) const { return saved_hash == other.saved_hash && this->key == other.key; }
bool keyEquals(const Key & key_, size_t hash_) const { return saved_hash == hash_ && this->key == key_; }
void setHash(size_t hash_value) { saved_hash = hash_value; }
size_t getHash(const Hash & hash) const { return saved_hash; }

View File

@ -95,7 +95,7 @@ struct HashTableCell
/// Равны ли ключи у ячеек.
bool keyEquals(const Key & key_) const { return key == key_; }
bool keyEquals(const HashTableCell & other) const { return key == other.key; }
bool keyEquals(const Key & key_, size_t hash_) const { return key == key_; }
/// Если ячейка умеет запоминать в себе значение хэш-функции, то запомнить его.
void setHash(size_t hash_value) {}
@ -244,9 +244,9 @@ protected:
#endif
/// Найти ячейку с тем же ключём или пустую ячейку, начиная с заданного места и далее по цепочке разрешения коллизий.
size_t findCell(const Key & x, size_t place_value) const
size_t findCell(const Key & x, size_t hash_value, size_t place_value) const
{
while (!buf[place_value].isZero(*this) && !buf[place_value].keyEquals(x))
while (!buf[place_value].isZero(*this) && !buf[place_value].keyEquals(x, hash_value))
{
place_value = grower.next(place_value);
#ifdef DBMS_HASH_MAP_COUNT_COLLISIONS
@ -338,14 +338,15 @@ protected:
*/
void reinsert(Cell & x)
{
size_t place_value = grower.place(x.getHash(*this));
size_t hash_value = x.getHash(*this);
size_t place_value = grower.place(hash_value);
/// Если элемент на своём месте.
if (&x == &buf[place_value])
return;
/// Вычисление нового места, с учётом цепочки разрешения коллизий.
place_value = findCell(Cell::getKey(x.getValue()), place_value);
place_value = findCell(Cell::getKey(x.getValue()), hash_value, place_value);
/// Если элемент остался на своём месте в старой цепочке разрешения коллизий.
if (!buf[place_value].isZero(*this))
@ -515,7 +516,7 @@ protected:
/// Только для ненулевых ключей. Найти нужное место, вставить туда ключ, если его ещё нет, вернуть итератор на ячейку.
void emplaceNonZero(Key x, iterator & it, bool & inserted, size_t hash_value)
{
size_t place_value = findCell(x, grower.place(hash_value));
size_t place_value = findCell(x, hash_value, grower.place(hash_value));
it = iterator(this, &buf[place_value]);
@ -589,7 +590,8 @@ public:
if (Cell::isZero(x, *this))
return this->hasZero() ? iteratorToZero() : end();
size_t place_value = findCell(x, grower.place(hash(x)));
size_t hash_value = hash(x);
size_t place_value = findCell(x, hash_value, grower.place(hash_value));
return !buf[place_value].isZero(*this) ? iterator(this, &buf[place_value]) : end();
}
@ -600,7 +602,8 @@ public:
if (Cell::isZero(x, *this))
return this->hasZero() ? iteratorToZero() : end();
size_t place_value = findCell(x, grower.place(hash(x)));
size_t hash_value = hash(x);
size_t place_value = findCell(x, hash_value, grower.place(hash_value));
return !buf[place_value].isZero(*this) ? const_iterator(this, &buf[place_value]) : end();
}

View File

@ -9,6 +9,8 @@
#include <functional>
#include <ostream>
#include <emmintrin.h>
/// Штука, чтобы не создавать строки для поиска подстроки в хэш таблице.
struct StringRef
@ -26,18 +28,80 @@ struct StringRef
typedef std::vector<StringRef> StringRefs;
/** Сравнение строк на равенство.
* Подход является спорным и выигрывает не во всех случаях.
* Подробнее смотрите hash_map_string_2.cpp
*/
inline bool compareSSE2(const char * p1, const char * p2)
{
return 0xFFFF == _mm_movemask_epi8(_mm_cmpeq_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(p1)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(p2))));
}
inline bool memequalSSE2Wide(const char * p1, const char * p2, size_t size)
{
while (size >= 64)
{
if ( compareSSE2(p1, p2)
&& compareSSE2(p1 + 16, p2 + 16)
&& compareSSE2(p1 + 32, p2 + 32)
&& compareSSE2(p1 + 40, p2 + 40))
{
p1 += 64;
p2 += 64;
size -= 64;
}
else
return false;
}
switch ((size % 64) / 16)
{
case 3: if (!compareSSE2(p1 + 32, p2 + 32)) return false;
case 2: if (!compareSSE2(p1 + 16, p2 + 16)) return false;
case 1: if (!compareSSE2(p1 , p2 )) return false;
case 0: break;
}
p1 += (size % 64) / 16 * 16;
p2 += (size % 64) / 16 * 16;
switch (size % 16)
{
case 15: if (p1[14] != p2[14]) return false;
case 14: if (p1[13] != p2[13]) return false;
case 13: if (p1[12] != p2[12]) return false;
case 12: if (reinterpret_cast<const uint32_t *>(p1)[2] == reinterpret_cast<const uint32_t *>(p2)[2]) goto l8; else return false;
case 11: if (p1[10] != p2[10]) return false;
case 10: if (p1[9] != p2[9]) return false;
case 9: if (p1[8] != p2[8]) return false;
l8: case 8: return reinterpret_cast<const uint64_t *>(p1)[0] == reinterpret_cast<const uint64_t *>(p2)[0];
case 7: if (p1[6] != p2[6]) return false;
case 6: if (p1[5] != p2[5]) return false;
case 5: if (p1[4] != p2[4]) return false;
case 4: return reinterpret_cast<const uint32_t *>(p1)[0] == reinterpret_cast<const uint32_t *>(p2)[0];
case 3: if (p1[2] != p2[2]) return false;
case 2: return reinterpret_cast<const uint16_t *>(p1)[0] == reinterpret_cast<const uint16_t *>(p2)[0];
case 1: if (p1[0] != p2[0]) return false;
case 0: break;
}
return true;
}
inline bool operator== (StringRef lhs, StringRef rhs)
{
/// Так почему-то быстрее, чем return lhs.size == rhs.size && 0 == memcmp(lhs.data, rhs.data, lhs.size);
if (lhs.size != rhs.size)
return false;
for (size_t pos = 0; pos < lhs.size; ++pos)
if (lhs.data[pos] != rhs.data[pos])
return false;
if (lhs.size == 0)
return true;
return true;
return memequalSSE2Wide(lhs.data, rhs.data, lhs.size);
}
inline bool operator!= (StringRef lhs, StringRef rhs)
@ -58,6 +122,119 @@ inline bool operator> (StringRef lhs, StringRef rhs)
}
/** Хэш-функции.
* Можно использовать либо CityHash64,
* либо функцию на основе инструкции crc32,
* которая является заведомо менее качественной, но на реальных наборах данных,
* при использовании в хэш-таблице, работает существенно быстрее.
* Подробнее см. hash_map_string_3.cpp
*/
#ifdef __SSE4_1__
#include <smmintrin.h>
#else
inline uint64 _mm_crc32_u64(uint64 crc, uint64 value)
{
asm("crc32q %[value], %[crc]\n" : [crc] "+r" (crc) : [value] "rm" (value));
return crc;
}
#endif
/// Кусочки взяты из CityHash.
inline uint64 hashLen16(uint64 u, uint64 v)
{
return Hash128to64(uint128(u, v));
}
inline uint64 shiftMix(uint64 val)
{
return val ^ (val >> 47);
}
inline uint64 rotateByAtLeast1(uint64 val, int shift)
{
return (val >> shift) | (val << (64 - shift));
}
inline size_t hashLessThan8(const char * data, size_t size)
{
static constexpr uint64 k2 = 0x9ae16a3b2f90404fULL;
static constexpr uint64 k3 = 0xc949d7c7509e6557ULL;
if (size >= 4)
{
uint64 a = *reinterpret_cast<const uint32_t *>(data);;
return hashLen16(size + (a << 3), *reinterpret_cast<const uint32_t *>(data + size - 4));
}
if (size > 0)
{
uint8 a = data[0];
uint8 b = data[size >> 1];
uint8 c = data[size - 1];
uint32 y = static_cast<uint32>(a) + (static_cast<uint32>(b) << 8);
uint32 z = size + (static_cast<uint32>(c) << 2);
return shiftMix(y * k2 ^ z * k3) * k2;
}
return k2;
}
inline size_t hashLessThan16(const char * data, size_t size)
{
if (size > 8)
{
uint64 a = *reinterpret_cast<const uint64_t *>(data);
uint64 b = *reinterpret_cast<const uint64_t *>(data + size - 8);
return hashLen16(a, rotateByAtLeast1(b + size, size)) ^ b;
}
return hashLessThan8(data, size);
}
struct CRC32Hash
{
size_t operator() (StringRef x) const
{
const char * pos = x.data;
size_t size = x.size;
if (size == 0)
return 0;
if (size < 8)
{
return hashLessThan8(x.data, x.size);
}
const char * end = pos + size;
size_t res = -1ULL;
do
{
uint64_t word = *reinterpret_cast<const uint64_t *>(pos);
res = _mm_crc32_u64(res, word);
pos += 8;
} while (pos + 8 < end);
uint64_t word = *reinterpret_cast<const uint64_t *>(end - 8); /// Не уверен, что это нормально.
res = _mm_crc32_u64(res, word);
return res;
}
};
#if 1
struct StringRefHash : CRC32Hash {};
#else
struct StringRefHash
{
size_t operator() (StringRef x) const
@ -66,6 +243,8 @@ struct StringRefHash
}
};
#endif
namespace std
{
@ -76,8 +255,8 @@ namespace std
namespace ZeroTraits
{
inline bool check(StringRef x) { return nullptr == x.data; }
inline void set(StringRef & x) { x.data = nullptr; }
inline bool check(StringRef x) { return 0 == x.size; }
inline void set(StringRef & x) { x.size = 0; }
};

View File

@ -25,9 +25,9 @@ public:
children.push_back(input_);
}
String getName() const { return "AddingConstColumnBlockInputStream"; }
String getName() const override { return "AddingConstColumnBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "AddingConstColumn(" << children.back()->getID() << ")";
@ -35,7 +35,7 @@ public:
}
protected:
Block readImpl()
Block readImpl() override
{
Block res = children.back()->read();
if (!res)

View File

@ -25,9 +25,9 @@ public:
children.push_back(input_);
}
String getName() const { return "AddingDefaultBlockInputStream"; }
String getName() const override { return "AddingDefaultBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "AddingDefault(" << children.back()->getID();
@ -40,7 +40,7 @@ public:
}
protected:
Block readImpl()
Block readImpl() override
{
Block res = children.back()->read();
if (!res)

View File

@ -31,7 +31,7 @@ public:
output->write(res);
}
void flush() { output->flush(); }
void flush() override { output->flush(); }
void writePrefix() override { output->writePrefix(); }
void writeSuffix() override { output->writeSuffix(); }

View File

@ -33,9 +33,9 @@ public:
AggregatingBlockInputStream(BlockInputStreamPtr input_, const Names & key_names, const AggregateDescriptions & aggregates,
bool overflow_row_, bool final_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_);
String getName() const { return "AggregatingBlockInputStream"; }
String getName() const override { return "AggregatingBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "Aggregating(" << children.back()->getID() << ", " << aggregator->getID() << ")";
@ -43,7 +43,7 @@ public:
}
protected:
Block readImpl();
Block readImpl() override;
SharedPtr<Aggregator> aggregator;
bool final;

View File

@ -27,9 +27,9 @@ public:
{
}
String getName() const { return "AggregatingSortedBlockInputStream"; }
String getName() const override { return "AggregatingSortedBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "AggregatingSorted(inputs";
@ -48,7 +48,7 @@ public:
protected:
/// Может возвращаться на 1 больше записей, чем max_block_size.
Block readImpl();
Block readImpl() override;
private:
Logger * log;

View File

@ -25,23 +25,23 @@ public:
children.push_back(in_);
}
String getName() const { return "AsynchronousBlockInputStream"; }
String getName() const override { return "AsynchronousBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "Asynchronous(" << children.back()->getID() << ")";
return res.str();
}
void readPrefix()
void readPrefix() override
{
children.back()->readPrefix();
next();
started = true;
}
void readSuffix()
void readSuffix() override
{
if (started)
{
@ -69,7 +69,7 @@ public:
}
~AsynchronousBlockInputStream()
~AsynchronousBlockInputStream() override
{
if (started)
pool.wait();
@ -84,7 +84,7 @@ protected:
ExceptionPtr exception;
Block readImpl()
Block readImpl() override
{
/// Если вычислений ещё не было - вычислим первый блок синхронно
if (!started)

View File

@ -17,7 +17,7 @@ class BinaryRowInputStream : public IRowInputStream
public:
BinaryRowInputStream(ReadBuffer & istr_, const Block & sample_);
bool read(Row & row);
bool read(Row & row) override;
private:
ReadBuffer & istr;

View File

@ -17,10 +17,10 @@ class BinaryRowOutputStream : public IRowOutputStream
public:
BinaryRowOutputStream(WriteBuffer & ostr_, const Block & sample_);
void writeField(const Field & field);
void writeRowEndDelimiter();
void writeField(const Field & field) override;
void writeRowEndDelimiter() override;
void flush() { ostr.next(); }
void flush() override { ostr.next(); }
protected:
WriteBuffer & ostr;

View File

@ -20,12 +20,12 @@ public:
const Block & sample_,
size_t max_block_size_ = DEFAULT_INSERT_BLOCK_SIZE); /// Обычно дамп читается в целях вставки в таблицу.
void readPrefix() { row_input->readPrefix(); }
void readSuffix() { row_input->readSuffix(); }
void readPrefix() override { row_input->readPrefix(); }
void readSuffix() override { row_input->readSuffix(); }
String getName() const { return "BlockInputStreamFromRowInputStream"; }
String getName() const override { return "BlockInputStreamFromRowInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << this;
@ -33,7 +33,7 @@ public:
}
protected:
Block readImpl();
Block readImpl() override;
private:
RowInputStreamPtr row_input;

View File

@ -14,15 +14,15 @@ class BlockOutputStreamFromRowOutputStream : public IBlockOutputStream
{
public:
BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_);
void write(const Block & block);
void writePrefix() { row_output->writePrefix(); }
void writeSuffix() { row_output->writeSuffix(); }
void write(const Block & block) override;
void writePrefix() override { row_output->writePrefix(); }
void writeSuffix() override { row_output->writeSuffix(); }
void flush() { row_output->flush(); }
void flush() override { row_output->flush(); }
void setRowsBeforeLimit(size_t rows_before_limit);
void setTotals(const Block & totals);
void setExtremes(const Block & extremes);
void setRowsBeforeLimit(size_t rows_before_limit) override;
void setTotals(const Block & totals) override;
void setExtremes(const Block & extremes) override;
private:
RowOutputStreamPtr row_output;

View File

@ -25,9 +25,9 @@ public:
~CollapsingFinalBlockInputStream();
String getName() const { return "CollapsingFinalBlockInputStream"; }
String getName() const override { return "CollapsingFinalBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "CollapsingFinal(inputs";
@ -45,7 +45,7 @@ public:
}
protected:
Block readImpl();
Block readImpl() override;
private:
struct MergingBlock;

View File

@ -32,9 +32,9 @@ public:
{
}
String getName() const { return "CollapsingSortedBlockInputStream"; }
String getName() const override { return "CollapsingSortedBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "CollapsingSorted(inputs";
@ -53,7 +53,7 @@ public:
protected:
/// Может возвращаться на 1 больше записей, чем max_block_size.
Block readImpl();
Block readImpl() override;
private:
String sign_column;

View File

@ -22,9 +22,9 @@ public:
current_stream = children.begin();
}
String getName() const { return "ConcatBlockInputStream"; }
String getName() const override { return "ConcatBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "Concat(";
@ -44,7 +44,7 @@ public:
}
protected:
Block readImpl()
Block readImpl() override
{
Block res;

View File

@ -9,7 +9,7 @@
namespace DB
{
/** Отдает без изменений данные из потока блоков, но перед чтением первого блока инициализирует все переданные множества.
/** Отдаёт без изменений данные из потока блоков, но перед чтением первого блока инициализирует все переданные множества.
*/
class CreatingSetsBlockInputStream : public IProfilingBlockInputStream
{
@ -29,9 +29,9 @@ public:
children.push_back(input);
}
String getName() const { return "CreatingSetsBlockInputStream"; }
String getName() const override { return "CreatingSetsBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "CreatingSets(";
@ -51,7 +51,7 @@ public:
}
protected:
Block readImpl();
Block readImpl() override;
private:
SubqueriesForSets subqueries_for_sets;

View File

@ -28,9 +28,9 @@ public:
children.push_back(input_);
}
String getName() const { return "DistinctBlockInputStream"; }
String getName() const override { return "DistinctBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "Distinct(" << children.back()->getID() << ")";
@ -38,7 +38,7 @@ public:
}
protected:
Block readImpl()
Block readImpl() override
{
/// Пока не встретится блок, после фильтрации которого что-нибудь останется, или поток не закончится.
while (1)

View File

@ -13,7 +13,7 @@ namespace DB
class EmptyBlockOutputStream : public IBlockOutputStream
{
public:
void write(const Block & block)
void write(const Block & block) override
{
throw Exception("Cannot write to EmptyBlockOutputStream", ErrorCodes::CANNOT_WRITE_TO_EMPTY_BLOCK_OUTPUT_STREAM);
}

View File

@ -26,16 +26,16 @@ public:
children.push_back(input_);
}
String getName() const { return "ExpressionBlockInputStream"; }
String getName() const override { return "ExpressionBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "Expression(" << children.back()->getID() << ", " << expression->getID() << ")";
return res.str();
}
const Block & getTotals()
const Block & getTotals() override
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{
@ -49,7 +49,7 @@ public:
}
protected:
Block readImpl()
Block readImpl() override
{
Block res = children.back()->read();
if (!res)

View File

@ -22,9 +22,9 @@ public:
FilterBlockInputStream(BlockInputStreamPtr input_, ssize_t filter_column_);
FilterBlockInputStream(BlockInputStreamPtr input_, const String & filter_column_name_);
String getName() const { return "FilterBlockInputStream"; }
String getName() const override { return "FilterBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "Filter(" << children.back()->getID() << ", " << filter_column << ", " << filter_column_name << ")";
@ -32,7 +32,7 @@ public:
}
protected:
Block readImpl();
Block readImpl() override;
private:
ssize_t filter_column;

View File

@ -71,14 +71,14 @@ private:
class IProfilingBlockInputStream : public IBlockInputStream
{
public:
Block read();
Block read() override;
/** Реализация по-умолчанию вызывает рекурсивно readSuffix() у всех детей, а затем readSuffixImpl() у себя.
* Если этот поток вызывает у детей read() в отдельном потоке, этот поведение обычно неверно:
* readSuffix() у ребенка нельзя вызывать в момент, когда read() того же ребенка выполняется в другом потоке.
* В таком случае нужно переопределить этот метод, чтобы readSuffix() у детей вызывался, например, после соединения потоков.
*/
void readSuffix();
void readSuffix() override;
/// Получить информацию о скорости выполнения.
const BlockStreamProfileInfo & getInfo() const { return info; }

View File

@ -18,14 +18,14 @@ class JSONCompactRowOutputStream : public JSONRowOutputStream
public:
JSONCompactRowOutputStream(WriteBuffer & ostr_, const Block & sample_);
void writeField(const Field & field);
void writeFieldDelimiter();
void writeRowStartDelimiter();
void writeRowEndDelimiter();
void writeField(const Field & field) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
protected:
void writeTotals();
void writeExtremes();
void writeTotals() override;
void writeExtremes() override;
};
}

View File

@ -19,23 +19,23 @@ class JSONRowOutputStream : public IRowOutputStream
public:
JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_);
void writeField(const Field & field);
void writeFieldDelimiter();
void writeRowStartDelimiter();
void writeRowEndDelimiter();
void writePrefix();
void writeSuffix();
void writeField(const Field & field) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writePrefix() override;
void writeSuffix() override;
void flush() { ostr.next(); dst_ostr.next(); }
void flush() override { ostr.next(); dst_ostr.next(); }
void setRowsBeforeLimit(size_t rows_before_limit_)
void setRowsBeforeLimit(size_t rows_before_limit_) override
{
applied_limit = true;
rows_before_limit = rows_before_limit_;
}
void setTotals(const Block & totals_) { totals = totals_; }
void setExtremes(const Block & extremes_) { extremes = extremes_; }
void setTotals(const Block & totals_) override { totals = totals_; }
void setExtremes(const Block & extremes_) override { extremes = extremes_; }
protected:

View File

@ -18,9 +18,9 @@ class LimitBlockInputStream : public IProfilingBlockInputStream
public:
LimitBlockInputStream(BlockInputStreamPtr input_, size_t limit_, size_t offset_ = 0);
String getName() const { return "LimitBlockInputStream"; }
String getName() const override { return "LimitBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "Limit(" << children.back()->getID() << ", " << limit << ", " << offset << ")";
@ -28,7 +28,7 @@ public:
}
protected:
Block readImpl();
Block readImpl() override;
private:
size_t limit;

View File

@ -18,9 +18,9 @@ public:
children.push_back(input_);
}
String getName() const { return "MaterializingBlockInputStream"; }
String getName() const override { return "MaterializingBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "Materializing(" << children.back()->getID() << ")";
@ -28,7 +28,7 @@ public:
}
protected:
Block readImpl()
Block readImpl() override
{
Block res = children.back()->read();

View File

@ -22,9 +22,9 @@ public:
children.push_back(input_);
}
String getName() const { return "MergeSortingBlockInputStream"; }
String getName() const override { return "MergeSortingBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "MergeSorting(" << children.back()->getID();
@ -37,7 +37,7 @@ public:
}
protected:
Block readImpl();
Block readImpl() override;
private:
SortDescription description;

View File

@ -30,9 +30,9 @@ public:
children.push_back(input_);
}
String getName() const { return "MergingAggregatedBlockInputStream"; }
String getName() const override { return "MergingAggregatedBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "MergingAggregated(" << children.back()->getID() << ", " << aggregator->getID() << ")";
@ -40,7 +40,7 @@ public:
}
protected:
Block readImpl();
Block readImpl() override;
private:
SharedPtr<Aggregator> aggregator;

View File

@ -26,9 +26,9 @@ public:
children.insert(children.end(), inputs_.begin(), inputs_.end());
}
String getName() const { return "MergingSortedBlockInputStream"; }
String getName() const override { return "MergingSortedBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "MergingSorted(";
@ -51,8 +51,8 @@ public:
}
protected:
Block readImpl();
void readSuffixImpl();
Block readImpl() override;
void readSuffixImpl() override;
/// Инициализирует очередь и следующий блок результата.
void init(Block & merged_block, ColumnPlainPtrs & merged_columns);

View File

@ -16,9 +16,9 @@ public:
NativeBlockInputStream(ReadBuffer & istr_, const DataTypeFactory & data_type_factory_)
: istr(istr_), data_type_factory(data_type_factory_) {}
String getName() const { return "NativeBlockInputStream"; }
String getName() const override { return "NativeBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << this;
@ -26,7 +26,7 @@ public:
}
protected:
Block readImpl();
Block readImpl() override;
private:
ReadBuffer & istr;

View File

@ -13,9 +13,9 @@ class NativeBlockOutputStream : public IBlockOutputStream
{
public:
NativeBlockOutputStream(WriteBuffer & ostr_) : ostr(ostr_) {}
void write(const Block & block);
void flush() { ostr.next(); }
void write(const Block & block) override;
void flush() override { ostr.next(); }
private:
WriteBuffer & ostr;

View File

@ -0,0 +1,46 @@
#pragma once
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/copyData.h>
namespace DB
{
/** Пустой поток блоков.
* Но при первой попытке чтения, копирует данные из переданного input-а в переданный output.
* Это нужно для выполнения запроса INSERT SELECT - запрос копирует данные, но сам ничего не возвращает.
* Запрос можно было бы выполнять и без оборачивания в пустой BlockInputStream,
* но не работал бы прогресс выполнения запроса и возможность отменить запрос.
*/
class NullAndDoCopyBlockInputStream : public IProfilingBlockInputStream
{
public:
NullAndDoCopyBlockInputStream(BlockInputStreamPtr input_, BlockOutputStreamPtr output_)
: input(input_), output(output_)
{
children.push_back(input_);
}
String getName() const override { return "NullAndDoCopyBlockInputStream"; }
String getID() const override
{
std::stringstream res;
res << "copy from " << input->getID();
return res.str();
}
protected:
Block readImpl() override
{
copyData(*input, *output);
return Block();
}
private:
BlockInputStreamPtr input;
BlockOutputStreamPtr output;
};
}

View File

@ -11,10 +11,10 @@ namespace DB
class NullBlockInputStream : public IBlockInputStream
{
public:
Block read() { return Block(); }
String getName() const { return "NullBlockInputStream"; }
Block read() override { return Block(); }
String getName() const override { return "NullBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << this;

View File

@ -11,7 +11,7 @@ namespace DB
class NullBlockOutputStream : public IBlockOutputStream
{
public:
void write(const Block & block) {}
void write(const Block & block) override {}
};
}

View File

@ -18,9 +18,9 @@ class OneBlockInputStream : public IProfilingBlockInputStream
public:
OneBlockInputStream(const Block & block_) : block(block_), has_been_read(false) {}
String getName() const { return "OneBlockInputStream"; }
String getName() const override { return "OneBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << this;
@ -28,7 +28,7 @@ public:
}
protected:
Block readImpl()
Block readImpl() override
{
if (has_been_read)
return Block();

View File

@ -41,9 +41,9 @@ public:
aggregator = new Aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_);
}
String getName() const { return "ParallelAggregatingBlockInputStream"; }
String getName() const override { return "ParallelAggregatingBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "ParallelAggregating(";
@ -63,7 +63,7 @@ public:
}
protected:
Block readImpl()
Block readImpl() override
{
if (has_been_read)
return Block();

View File

@ -21,9 +21,9 @@ public:
children.push_back(input_);
}
String getName() const { return "PartialSortingBlockInputStream"; }
String getName() const override { return "PartialSortingBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "PartialSorting(" << children.back()->getID();
@ -36,7 +36,7 @@ public:
}
protected:
Block readImpl();
Block readImpl() override;
private:
SortDescription description;

View File

@ -15,13 +15,14 @@ class PrettyBlockOutputStream : public IBlockOutputStream
public:
/// no_escapes - не использовать ANSI escape sequences - для отображения в браузере, а не в консоли.
PrettyBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_ = false, size_t max_rows_ = PRETTY_FORMAT_DEFAULT_MAX_ROWS);
void write(const Block & block);
void writeSuffix();
void flush() { ostr.next(); }
void write(const Block & block) override;
void writeSuffix() override;
void setTotals(const Block & totals_) { totals = totals_; }
void setExtremes(const Block & extremes_) { extremes = extremes_; }
void flush() override { ostr.next(); }
void setTotals(const Block & totals_) override { totals = totals_; }
void setExtremes(const Block & extremes_) override { extremes = extremes_; }
protected:
void writeTotals();

View File

@ -14,7 +14,7 @@ public:
PrettyCompactBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_ = false, size_t max_rows_ = PRETTY_FORMAT_DEFAULT_MAX_ROWS)
: PrettyBlockOutputStream(ostr_, no_escapes_, max_rows_) {}
void write(const Block & block);
void write(const Block & block) override;
protected:
void writeHeader(const Block & block, const Widths_t & max_widths, const Widths_t & name_widths);

View File

@ -15,8 +15,8 @@ public:
PrettyCompactMonoBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_ = false, size_t max_rows_ = PRETTY_FORMAT_DEFAULT_MAX_ROWS)
: PrettyCompactBlockOutputStream(ostr_, no_escapes_, max_rows_) {}
void write(const Block & block);
void writeSuffix();
void write(const Block & block) override;
void writeSuffix() override;
private:
typedef std::vector<Block> Blocks_t;

View File

@ -14,8 +14,8 @@ public:
PrettySpaceBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_ = false, size_t max_rows_ = PRETTY_FORMAT_DEFAULT_MAX_ROWS)
: PrettyBlockOutputStream(ostr_, no_escapes_, max_rows_) {}
void write(const Block & block);
void writeSuffix();
void write(const Block & block) override;
void writeSuffix() override;
};
}

View File

@ -31,28 +31,28 @@ public:
QueueBlockIOStream(size_t queue_size_ = std::numeric_limits<int>::max())
: queue_size(queue_size_), queue(queue_size) {}
String getName() const { return "QueueBlockIOStream"; }
String getName() const override { return "QueueBlockIOStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << this;
return res.str();
}
void write(const Block & block)
void write(const Block & block) override
{
queue.push(block);
}
void cancel()
void cancel() override
{
IProfilingBlockInputStream::cancel();
queue.clear();
}
protected:
Block readImpl()
Block readImpl() override
{
Block res;
queue.pop(res);

View File

@ -52,10 +52,10 @@ public:
}
String getName() const { return "RemoteBlockInputStream"; }
String getName() const override { return "RemoteBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << this;
@ -69,7 +69,7 @@ public:
void progress(const Progress & value) override {}
void cancel()
void cancel() override
{
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
return;
@ -85,7 +85,7 @@ public:
}
~RemoteBlockInputStream()
~RemoteBlockInputStream() override
{
/** Если прервались в середине цикла общения с сервером, то закрываем соединение,
* чтобы оно не осталось висеть в рассихронизированном состоянии.
@ -112,7 +112,7 @@ protected:
connection->sendExternalTablesData(res);
}
Block readImpl()
Block readImpl() override
{
if (!sent_query)
{
@ -181,7 +181,7 @@ protected:
}
}
void readSuffixImpl()
void readSuffixImpl() override
{
/** Если одно из:
* - ничего не начинали делать;

View File

@ -24,9 +24,9 @@ public:
children.push_back(input_);
}
String getName() const { return "RemoveColumnsBlockInputStream"; }
String getName() const override { return "RemoveColumnsBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "RemoveColumns(" << children.back()->getID();
@ -39,7 +39,7 @@ public:
}
protected:
Block readImpl()
Block readImpl() override
{
Block res = children.back()->read();
if (!res)

View File

@ -14,10 +14,11 @@ class RowInputStreamFromBlockInputStream : public IRowInputStream
{
public:
explicit RowInputStreamFromBlockInputStream(BlockInputStreamPtr block_input_);
bool read(Row & row);
void readPrefix() { block_input->readPrefix(); };
void readSuffix() { block_input->readSuffix(); };
bool read(Row & row) override;
void readPrefix() override { block_input->readPrefix(); };
void readSuffix() override { block_input->readSuffix(); };
private:
BlockInputStreamPtr block_input;

View File

@ -37,9 +37,9 @@ public:
children.push_back(input_);
}
String getName() const { return "SplittingAggregatingBlockInputStream"; }
String getName() const override { return "SplittingAggregatingBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "SplittingAggregating(" << children.back()->getID() << ", " << aggregator->getID() << ")";
@ -47,7 +47,7 @@ public:
}
protected:
Block readImpl()
Block readImpl() override
{
if (!started)
{

View File

@ -24,9 +24,9 @@ public:
{
}
String getName() const { return "SummingSortedBlockInputStream"; }
String getName() const override { return "SummingSortedBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "SummingSorted(inputs";
@ -45,7 +45,7 @@ public:
protected:
/// Может возвращаться на 1 больше записей, чем max_block_size.
Block readImpl();
Block readImpl() override;
private:
Logger * log;

View File

@ -14,9 +14,9 @@ class TabSeparatedBlockOutputStream : public IBlockOutputStream
{
public:
TabSeparatedBlockOutputStream(WriteBuffer & ostr_) : ostr(ostr_) {}
void write(const Block & block);
void flush() { ostr.next(); }
void write(const Block & block) override;
void flush() override { ostr.next(); }
private:
WriteBuffer & ostr;

View File

@ -15,7 +15,7 @@ public:
TabSeparatedRawRowOutputStream(WriteBuffer & ostr_, const Block & sample_, bool with_names_ = false, bool with_types_ = false)
: TabSeparatedRowOutputStream(ostr_, sample_, with_names_, with_types_) {}
void writeField(const Field & field)
void writeField(const Field & field) override
{
data_types[field_number]->serializeText(field, ostr);
++field_number;

View File

@ -20,8 +20,8 @@ public:
*/
TabSeparatedRowInputStream(ReadBuffer & istr_, const Block & sample_, bool with_names_ = false, bool with_types_ = false);
bool read(Row & row);
void readPrefix();
bool read(Row & row) override;
void readPrefix() override;
private:
ReadBuffer & istr;

View File

@ -20,16 +20,16 @@ public:
*/
TabSeparatedRowOutputStream(WriteBuffer & ostr_, const Block & sample_, bool with_names_ = false, bool with_types_ = false);
void writeField(const Field & field);
void writeFieldDelimiter();
void writeRowEndDelimiter();
void writePrefix();
void writeSuffix();
void writeField(const Field & field) override;
void writeFieldDelimiter() override;
void writeRowEndDelimiter() override;
void writePrefix() override;
void writeSuffix() override;
void flush() { ostr.next(); }
void flush() override { ostr.next(); }
void setTotals(const Block & totals_) { totals = totals_; }
void setExtremes(const Block & extremes_) { extremes = extremes_; }
void setTotals(const Block & totals_) override { totals = totals_; }
void setExtremes(const Block & extremes_) override { extremes = extremes_; }
protected:
void writeTotals();

View File

@ -28,9 +28,9 @@ public:
children.push_back(input_);
}
String getName() const { return "TotalsHavingBlockInputStream"; }
String getName() const override { return "TotalsHavingBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "TotalsHavingBlockInputStream(" << children.back()->getID() << ", " << aggregator->getID()
@ -38,10 +38,10 @@ public:
return res.str();
}
const Block & getTotals();
const Block & getTotals() override;
protected:
Block readImpl();
Block readImpl() override;
private:
SharedPtr<Aggregator> aggregator;

View File

@ -44,9 +44,9 @@ public:
input_queue.emplace(inputs_[i], i);
}
String getName() const { return "UnionBlockInputStream"; }
String getName() const override { return "UnionBlockInputStream"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "Union(";
@ -66,7 +66,7 @@ public:
}
~UnionBlockInputStream()
~UnionBlockInputStream() override
{
try
{
@ -84,7 +84,7 @@ public:
/** Отличается от реализации по-умолчанию тем, что пытается остановить все источники,
* пропуская отвалившиеся по эксепшену.
*/
void cancel()
void cancel() override
{
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
return;
@ -127,7 +127,7 @@ protected:
LOG_TRACE(log, "Waited for threads to finish");
}
Block readImpl()
Block readImpl() override
{
OutputData res;
if (all_read)
@ -153,7 +153,7 @@ protected:
return res.block;
}
void readSuffix()
void readSuffix() override
{
if (!all_read && !is_cancelled)
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);

View File

@ -20,7 +20,7 @@ class ValuesRowInputStream : public IRowInputStream
public:
ValuesRowInputStream(ReadBuffer & istr_, const Block & sample_);
bool read(Row & row);
bool read(Row & row) override;
private:
ReadBuffer & istr;

View File

@ -20,13 +20,13 @@ class ValuesRowOutputStream : public IRowOutputStream
public:
ValuesRowOutputStream(WriteBuffer & ostr_, const Block & sample_);
void writeField(const Field & field);
void writeFieldDelimiter();
void writeRowStartDelimiter();
void writeRowEndDelimiter();
void writeRowBetweenDelimiter();
void writeField(const Field & field) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
void flush() { ostr.next(); }
void flush() override { ostr.next(); }
private:
WriteBuffer & ostr;

View File

@ -21,11 +21,11 @@ class VerticalRowOutputStream : public IRowOutputStream
public:
VerticalRowOutputStream(WriteBuffer & ostr_, const Block & sample_);
void writeField(const Field & field);
void writeRowStartDelimiter();
void writeRowBetweenDelimiter();
void writeField(const Field & field) override;
void writeRowStartDelimiter() override;
void writeRowBetweenDelimiter() override;
void flush() { ostr.next(); }
void flush() override { ostr.next(); }
private:
WriteBuffer & ostr;

View File

@ -131,7 +131,7 @@ struct SipHash64Impl
struct SipHash128Impl
{
static constexpr auto name = "SipHash128";
static constexpr auto name = "sipHash128";
static constexpr auto length = 16;
static void apply(const char * begin, const size_t size, unsigned char * out_char_data)

View File

@ -12,6 +12,7 @@
#include <DB/Columns/ColumnFixedString.h>
#include <DB/Columns/ColumnConst.h>
#include <DB/Functions/IFunction.h>
#include <statdaemons/ext/range.hpp>
namespace DB
@ -968,24 +969,94 @@ public:
/// Получить тип результата по типам аргументов. Если функция неприменима для данных аргументов - кинуть исключение.
DataTypePtr getReturnType(const DataTypes & arguments) const
{
if (arguments.size() != 2)
if (arguments.size() < 2)
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 2.",
+ toString(arguments.size()) + ", should be at least 1.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!typeid_cast<const DataTypeString *>(&*arguments[0]) && !typeid_cast<const DataTypeFixedString *>(&*arguments[0]))
throw Exception("Illegal type " + arguments[0]->getName() + " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!typeid_cast<const DataTypeString *>(&*arguments[1]) && !typeid_cast<const DataTypeFixedString *>(&*arguments[1]))
throw Exception("Illegal type " + arguments[1]->getName() + " of second argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
for (const auto arg_idx : ext::range(0, arguments.size()))
{
const auto arg = arguments[arg_idx].get();
if (!typeid_cast<const DataTypeString *>(arg) &&
!typeid_cast<const DataTypeFixedString *>(arg))
throw Exception{
"Illegal type " + arg->getName() + " of argument " + std::to_string(arg_idx + 1) + " of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
return new DataTypeString;
}
enum class instr_type : uint8_t
{
copy_string,
copy_fixed_string,
copy_const_string
};
/// column pointer augmented with offset (current offset String/FixedString, unused for Const<String>)
using column_uint_pair_t = std::pair<const IColumn *, IColumn::Offset_t>;
/// instr_type is being stored to allow using static_cast safely
using instr_t = std::pair<instr_type, column_uint_pair_t>;
using instrs_t = std::vector<instr_t>;
/** calculate total length of resulting strings (without terminating nulls), determine whether all input
* strings are constant, assemble instructions */
instrs_t getInstructions(const Block & block, const ColumnNumbers & arguments, size_t & out_length, bool & out_const)
{
instrs_t result{};
result.reserve(arguments.size());
out_length = 0;
out_const = true;
for (const auto arg_pos : arguments)
{
const auto column = block.getByPosition(arg_pos).column.get();
if (const auto col = typeid_cast<const ColumnString *>(column))
{
/** ColumnString stores strings with terminating null character
* which should not be copied, therefore the decrease of total size by
* the number of terminating nulls */
out_length += col->getChars().size() - col->getOffsets().size();
out_const = false;
result.emplace_back(instr_type::copy_string, column_uint_pair_t{col, 0});
}
else if (const auto col = typeid_cast<const ColumnFixedString *>(column))
{
out_length += col->getChars().size();
out_const = false;
result.emplace_back(instr_type::copy_fixed_string, column_uint_pair_t{col, 0});
}
else if (const auto col = typeid_cast<const ColumnConstString *>(column))
{
out_length += col->getData().size();
out_const = out_const && true;
result.emplace_back(instr_type::copy_const_string, column_uint_pair_t{col, 0});
}
else
throw Exception("Illegal column " + column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
return result;
}
/// Выполнить функцию над блоком.
void execute(Block & block, const ColumnNumbers & arguments, size_t result)
void execute(Block & block, const ColumnNumbers & arguments, const size_t result)
{
if (arguments.size() == 2)
executeBinary(block, arguments, result);
else
executeNAry(block, arguments, result);
}
void executeBinary(Block & block, const ColumnNumbers & arguments, const size_t result)
{
const IColumn * c0 = &*block.getByPosition(arguments[0]).column;
const IColumn * c1 = &*block.getByPosition(arguments[1]).column;
@ -1059,6 +1130,81 @@ public:
ErrorCodes::ILLEGAL_COLUMN);
}
}
void executeNAry(Block & block, const ColumnNumbers & arguments, const size_t result)
{
const auto size = block.rowsInFirstColumn();
std::size_t result_length{};
bool result_is_const{};
auto instrs = getInstructions(block, arguments, result_length, result_is_const);
if (result_is_const)
{
const auto out = new ColumnConst<String>{size, ""};
block.getByPosition(result).column = out;
auto & data = out->getData();
data.reserve(result_length);
for (const auto & instr : instrs)
data += static_cast<const ColumnConst<String> *>(instr.second.first)->getData();
}
else
{
const auto out = new ColumnString{};
block.getByPosition(result).column = out;
auto & out_data = out->getChars();
out_data.resize(result_length + size);
auto & out_offsets = out->getOffsets();
out_offsets.resize(size);
std::size_t out_offset{};
for (const auto row : ext::range(0, size))
{
for (auto & instr : instrs)
{
if (instr_type::copy_string == instr.first)
{
auto & in_offset = instr.second.second;
const auto col = static_cast<const ColumnString *>(instr.second.first);
const auto offset = col->getOffsets()[row];
const auto length = offset - in_offset - 1;
memcpy(&out_data[out_offset], &col->getChars()[in_offset], length);
out_offset += length;
in_offset = offset;
}
else if (instr_type::copy_fixed_string == instr.first)
{
auto & in_offset = instr.second.second;
const auto col = static_cast<const ColumnFixedString *>(instr.second.first);
const auto length = col->getN();
memcpy(&out_data[out_offset], &col->getChars()[in_offset], length);
out_offset += length;
in_offset += length;
}
else if (instr_type::copy_const_string == instr.first)
{
const auto col = static_cast<const ColumnConst<String> *>(instr.second.first);
const auto & data = col->getData();
const auto length = data.size();
memcpy(&out_data[out_offset], data.data(), length);
out_offset += length;
}
else
throw std::logic_error{"unknown instr_type"};
}
out_data[out_offset] = '\0';
out_offsets[row] = ++out_offset;
}
}
}
};

View File

@ -2,7 +2,6 @@
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Yandex/time2str.h>
namespace DB
@ -32,8 +31,7 @@ public:
++block_index;
String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index);
time_t min_date_time = DateLUT::instance().fromDayNum(DayNum_t(current_block.min_date));
String month_name = toString(Date2OrderedIdentifier(min_date_time) / 100);
String month_name = toString(DateLUT::instance().toNumYYYYMMDD(DayNum_t(current_block.min_date)) / 100);
AbandonableLockInZooKeeper block_number_lock = storage.allocateBlockNumber(month_name);

View File

@ -64,6 +64,10 @@ struct ReplicatedMergeTreeLogEntry
bool currently_executing = false; /// Доступ под queue_mutex.
std::condition_variable execution_complete; /// Пробуждается когда currently_executing становится false.
/// Время создания или время копирования из общего лога в очередь конкретной реплики.
time_t create_time = 0;
void addResultToVirtualParts(StorageReplicatedMergeTree & storage);
void tagPartAsFuture(StorageReplicatedMergeTree & storage);

View File

@ -115,6 +115,7 @@ public:
UInt32 queue_size;
UInt32 inserts_in_queue;
UInt32 merges_in_queue;
UInt32 queue_oldest_time;
UInt64 log_max_index;
UInt64 log_pointer;
UInt8 total_replicas;

View File

@ -3,6 +3,7 @@
#include <DB/DataStreams/AddingDefaultBlockOutputStream.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/PushingToViewsBlockOutputStream.h>
#include <DB/DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/Parsers/ASTInsertQuery.h>
@ -128,7 +129,8 @@ BlockIO InterpreterInsertQuery::execute()
table->write(query_ptr);
/// Создаем кортеж из нескольких стримов, в которые будем писать данные.
BlockOutputStreamPtr out = new AddingDefaultBlockOutputStream(new PushingToViewsBlockOutputStream(query.database, query.table, context, query_ptr), required_columns);
BlockOutputStreamPtr out = new AddingDefaultBlockOutputStream(
new PushingToViewsBlockOutputStream(query.database, query.table, context, query_ptr), required_columns);
BlockIO res;
res.out_sample = getSampleBlock();
@ -141,9 +143,8 @@ BlockIO InterpreterInsertQuery::execute()
else
{
InterpreterSelectQuery interpreter_select(query.select, context);
BlockInputStreamPtr in = interpreter_select.execute();
in = new MaterializingBlockInputStream(in);
copyData(*in, *out);
BlockInputStreamPtr in = new MaterializingBlockInputStream(interpreter_select.execute());
res.in = new NullAndDoCopyBlockInputStream(in, out);
}
return res;

View File

@ -276,8 +276,6 @@ bool Join::insertFromBlock(const Block & block)
size_t rows = block.rows();
/// Какую структуру данных для множества использовать?
keys_fit_128_bits = false;
if (empty())
init(Set::chooseMethod(key_columns, keys_fit_128_bits, key_sizes));

View File

@ -65,6 +65,7 @@ Set::Type Set::chooseMethod(const ConstColumnPlainPtrs & key_columns, bool & key
keys_fit_128_bits = true;
size_t keys_bytes = 0;
key_sizes.resize(keys_size);
for (size_t j = 0; j < keys_size; ++j)
{
if (!key_columns[j]->isFixed())
@ -75,6 +76,7 @@ Set::Type Set::chooseMethod(const ConstColumnPlainPtrs & key_columns, bool & key
key_sizes[j] = key_columns[j]->sizeOfField();
keys_bytes += key_sizes[j];
}
if (keys_bytes > 16)
keys_fit_128_bits = false;
@ -88,6 +90,7 @@ Set::Type Set::chooseMethod(const ConstColumnPlainPtrs & key_columns, bool & key
|| typeid_cast<const ColumnConstString *>(key_columns[0])
|| (typeid_cast<const ColumnFixedString *>(key_columns[0]) && !keys_fit_128_bits)))
return KEY_STRING;
/// Если много ключей - будем строить множество хэшей от них
return HASHED;
}
@ -109,8 +112,6 @@ bool Set::insertFromBlock(Block & block, bool create_ordered_set)
size_t rows = block.rows();
/// Какую структуру данных для множества использовать?
keys_fit_128_bits = false;
if (empty())
init(chooseMethod(key_columns, keys_fit_128_bits, key_sizes));

View File

@ -0,0 +1,639 @@
#include <iostream>
#include <iomanip>
#include <vector>
#include <statdaemons/Stopwatch.h>
//#define DBMS_HASH_MAP_COUNT_COLLISIONS
#define DBMS_HASH_MAP_DEBUG_RESIZES
#include <DB/Core/Types.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/Core/StringRef.h>
#include <DB/Common/HashTable/HashMap.h>
#include <DB/Interpreters/AggregationCommon.h>
#include <smmintrin.h>
/** Выполнять так:
for file in MobilePhoneModel PageCharset Params URLDomain UTMSource Referer URL Title; do
for size in 30000 100000 300000 1000000 5000000; do
echo
BEST_METHOD=0
BEST_RESULT=0
for method in {1..12}; do
echo -ne $file $size $method '';
TOTAL_ELEMS=0
for i in {0..1000}; do
TOTAL_ELEMS=$(( $TOTAL_ELEMS + $size ))
if [[ $TOTAL_ELEMS -gt 25000000 ]]; then break; fi
./hash_map_string_2 $size $method < ${file}.bin 2>&1 |
grep HashMap | grep -oE '[0-9\.]+ elem';
done | awk -W interactive '{ if ($1 > x) { x = $1 }; printf(".") } END { print x }' | tee /tmp/hash_map_string_2_res;
CUR_RESULT=$(cat /tmp/hash_map_string_2_res | tr -d '.')
if [[ $CUR_RESULT -gt $BEST_RESULT ]]; then
BEST_METHOD=$method
BEST_RESULT=$CUR_RESULT
fi;
done;
echo Best: $BEST_METHOD - $BEST_RESULT
done;
done
*/
#define DefineStringRef(STRUCT) \
\
struct STRUCT : public StringRef {}; \
\
namespace ZeroTraits \
{ \
template <> \
inline bool check<STRUCT>(STRUCT x) { return 0 == x.size; } \
\
template <> \
inline void set<STRUCT>(STRUCT & x) { x.size = 0; } \
}; \
\
template <> \
struct DefaultHash<STRUCT> \
{ \
size_t operator() (STRUCT x) const \
{ \
return CityHash64(x.data, x.size); \
} \
};
DefineStringRef(StringRef_Compare1_Ptrs)
DefineStringRef(StringRef_Compare1_Index)
DefineStringRef(StringRef_CompareMemcmp)
DefineStringRef(StringRef_Compare8_1_byUInt64)
DefineStringRef(StringRef_Compare16_1_byMemcmp)
DefineStringRef(StringRef_Compare16_1_byUInt64_logicAnd)
DefineStringRef(StringRef_Compare16_1_byUInt64_bitAnd)
DefineStringRef(StringRef_Compare16_1_byIntSSE)
DefineStringRef(StringRef_Compare16_1_byFloatSSE)
DefineStringRef(StringRef_Compare16_1_bySSE4)
DefineStringRef(StringRef_Compare16_1_bySSE4_wide)
DefineStringRef(StringRef_Compare16_1_bySSE_wide)
DefineStringRef(StringRef_CompareAlwaysTrue)
DefineStringRef(StringRef_CompareAlmostAlwaysTrue)
inline bool operator==(StringRef_Compare1_Ptrs lhs, StringRef_Compare1_Ptrs rhs)
{
if (lhs.size != rhs.size)
return false;
if (lhs.size == 0)
return true;
const char * pos1 = lhs.data;
const char * pos2 = rhs.data;
const char * end1 = pos1 + lhs.size;
while (pos1 < end1)
{
if (*pos1 != *pos2)
return false;
++pos1;
++pos2;
}
return true;
}
inline bool operator==(StringRef_Compare1_Index lhs, StringRef_Compare1_Index rhs)
{
if (lhs.size != rhs.size)
return false;
if (lhs.size == 0)
return true;
for (size_t i = 0; i < lhs.size; ++i)
if (lhs.data[i] != rhs.data[i])
return false;
return true;
}
inline bool operator==(StringRef_CompareMemcmp lhs, StringRef_CompareMemcmp rhs)
{
if (lhs.size != rhs.size)
return false;
if (lhs.size == 0)
return true;
return 0 == memcmp(lhs.data, rhs.data, lhs.size);
}
inline bool operator==(StringRef_Compare8_1_byUInt64 lhs, StringRef_Compare8_1_byUInt64 rhs)
{
if (lhs.size != rhs.size)
return false;
if (lhs.size == 0)
return true;
const char * p1 = lhs.data;
const char * p2 = rhs.data;
size_t size = lhs.size;
const char * p1_end = p1 + size;
const char * p1_end_8 = p1 + size / 8 * 8;
while (p1 < p1_end_8)
{
if (reinterpret_cast<const uint64_t *>(p1)[0] != reinterpret_cast<const uint64_t *>(p2)[0])
return false;
p1 += 8;
p2 += 8;
}
while (p1 < p1_end)
{
if (*p1 != *p2)
return false;
++p1;
++p2;
}
return true;
}
inline bool compare_byMemcmp(const char * p1, const char * p2)
{
return 0 == memcmp(p1, p2, 16);
}
inline bool compare_byUInt64_logicAnd(const char * p1, const char * p2)
{
return reinterpret_cast<const uint64_t *>(p1)[0] == reinterpret_cast<const uint64_t *>(p2)[0]
&& reinterpret_cast<const uint64_t *>(p1)[1] == reinterpret_cast<const uint64_t *>(p2)[1];
}
inline bool compare_byUInt64_bitAnd(const char * p1, const char * p2)
{
return (reinterpret_cast<const uint64_t *>(p1)[0] == reinterpret_cast<const uint64_t *>(p2)[0])
& (reinterpret_cast<const uint64_t *>(p1)[1] == reinterpret_cast<const uint64_t *>(p2)[1]);
}
inline bool compare_byIntSSE(const char * p1, const char * p2)
{
return 0xFFFF == _mm_movemask_epi8(_mm_cmpeq_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(p1)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(p2))));
}
inline bool compare_byFloatSSE(const char * p1, const char * p2)
{
return !_mm_movemask_ps(_mm_cmpneq_ps( /// Кажется, некорректно при сравнении субнормальных float-ов.
_mm_loadu_ps(reinterpret_cast<const float *>(p1)),
_mm_loadu_ps(reinterpret_cast<const float *>(p2))));
}
template <bool compare(const char *, const char *)>
inline bool memequal(const char * p1, const char * p2, size_t size)
{
// const char * p1_end = p1 + size;
const char * p1_end_16 = p1 + size / 16 * 16;
while (p1 < p1_end_16)
{
if (!compare(p1, p2))
return false;
p1 += 16;
p2 += 16;
}
/* while (p1 < p1_end)
{
if (*p1 != *p2)
return false;
++p1;
++p2;
}*/
switch (size % 16)
{
case 15: if (p1[14] != p2[14]) return false;
case 14: if (p1[13] != p2[13]) return false;
case 13: if (p1[12] != p2[12]) return false;
case 12: if (reinterpret_cast<const UInt32 *>(p1)[2] == reinterpret_cast<const UInt32 *>(p2)[2]) goto l8; else return false;
case 11: if (p1[10] != p2[10]) return false;
case 10: if (p1[9] != p2[9]) return false;
case 9: if (p1[8] != p2[8]) return false;
l8: case 8: return reinterpret_cast<const UInt64 *>(p1)[0] == reinterpret_cast<const UInt64 *>(p2)[0];
case 7: if (p1[6] != p2[6]) return false;
case 6: if (p1[5] != p2[5]) return false;
case 5: if (p1[4] != p2[4]) return false;
case 4: return reinterpret_cast<const UInt32 *>(p1)[0] == reinterpret_cast<const UInt32 *>(p2)[0];
case 3: if (p1[2] != p2[2]) return false;
case 2: return reinterpret_cast<const UInt16 *>(p1)[0] == reinterpret_cast<const UInt16 *>(p2)[0];
case 1: if (p1[0] != p2[0]) return false;
case 0: break;
}
return true;
}
inline bool memequal_sse41(const char * p1, const char * p2, size_t size)
{
// const char * p1_end = p1 + size;
const char * p1_end_16 = p1 + size / 16 * 16;
__m128i zero16 = _mm_setzero_si128();
while (p1 < p1_end_16)
{
if (!_mm_testc_si128(
zero16,
_mm_xor_si128(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(p1)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(p2)))))
return false;
p1 += 16;
p2 += 16;
}
/* while (p1 < p1_end)
{
if (*p1 != *p2)
return false;
++p1;
++p2;
}*/
switch (size % 16)
{
case 15: if (p1[14] != p2[14]) return false;
case 14: if (p1[13] != p2[13]) return false;
case 13: if (p1[12] != p2[12]) return false;
case 12: if (reinterpret_cast<const UInt32 *>(p1)[2] == reinterpret_cast<const UInt32 *>(p2)[2]) goto l8; else return false;
case 11: if (p1[10] != p2[10]) return false;
case 10: if (p1[9] != p2[9]) return false;
case 9: if (p1[8] != p2[8]) return false;
l8: case 8: return reinterpret_cast<const UInt64 *>(p1)[0] == reinterpret_cast<const UInt64 *>(p2)[0];
case 7: if (p1[6] != p2[6]) return false;
case 6: if (p1[5] != p2[5]) return false;
case 5: if (p1[4] != p2[4]) return false;
case 4: return reinterpret_cast<const UInt32 *>(p1)[0] == reinterpret_cast<const UInt32 *>(p2)[0];
case 3: if (p1[2] != p2[2]) return false;
case 2: return reinterpret_cast<const UInt16 *>(p1)[0] == reinterpret_cast<const UInt16 *>(p2)[0];
case 1: if (p1[0] != p2[0]) return false;
case 0: break;
}
return true;
}
inline bool memequal_sse41_wide(const char * p1, const char * p2, size_t size)
{
__m128i zero16 = _mm_setzero_si128();
// const char * p1_end = p1 + size;
while (size >= 64)
{
if (_mm_testc_si128(
zero16,
_mm_xor_si128(
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p1)[0]),
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p2)[0])))
&& _mm_testc_si128(
zero16,
_mm_xor_si128(
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p1)[1]),
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p2)[1])))
&& _mm_testc_si128(
zero16,
_mm_xor_si128(
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p1)[2]),
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p2)[2])))
&& _mm_testc_si128(
zero16,
_mm_xor_si128(
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p1)[3]),
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p2)[3]))))
{
p1 += 64;
p2 += 64;
size -= 64;
}
else
return false;
}
switch ((size % 64) / 16)
{
case 3:
if (!_mm_testc_si128(
zero16,
_mm_xor_si128(
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p1)[2]),
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p2)[2]))))
return false;
case 2:
if (!_mm_testc_si128(
zero16,
_mm_xor_si128(
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p1)[1]),
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p2)[1]))))
return false;
case 1:
if (!_mm_testc_si128(
zero16,
_mm_xor_si128(
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p1)[0]),
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p2)[0]))))
return false;
}
p1 += (size % 64) / 16 * 16;
p2 += (size % 64) / 16 * 16;
/*
if (size >= 32)
{
if (_mm_testc_si128(
zero16,
_mm_xor_si128(
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p1)[0]),
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p2)[0])))
& _mm_testc_si128(
zero16,
_mm_xor_si128(
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p1)[1]),
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p2)[1]))))
{
p1 += 32;
p2 += 32;
size -= 32;
}
else
return false;
}
if (size >= 16)
{
if (_mm_testc_si128(
zero16,
_mm_xor_si128(
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p1)[0]),
_mm_loadu_si128(&reinterpret_cast<const __m128i *>(p2)[0]))))
{
p1 += 16;
p2 += 16;
size -= 16;
}
else
return false;
}*/
switch (size % 16)
{
case 15: if (p1[14] != p2[14]) return false;
case 14: if (p1[13] != p2[13]) return false;
case 13: if (p1[12] != p2[12]) return false;
case 12: if (reinterpret_cast<const UInt32 *>(p1)[2] == reinterpret_cast<const UInt32 *>(p2)[2]) goto l8; else return false;
case 11: if (p1[10] != p2[10]) return false;
case 10: if (p1[9] != p2[9]) return false;
case 9: if (p1[8] != p2[8]) return false;
l8: case 8: return reinterpret_cast<const UInt64 *>(p1)[0] == reinterpret_cast<const UInt64 *>(p2)[0];
case 7: if (p1[6] != p2[6]) return false;
case 6: if (p1[5] != p2[5]) return false;
case 5: if (p1[4] != p2[4]) return false;
case 4: return reinterpret_cast<const UInt32 *>(p1)[0] == reinterpret_cast<const UInt32 *>(p2)[0];
case 3: if (p1[2] != p2[2]) return false;
case 2: return reinterpret_cast<const UInt16 *>(p1)[0] == reinterpret_cast<const UInt16 *>(p2)[0];
case 1: if (p1[0] != p2[0]) return false;
case 0: break;
}
return true;
}
inline bool memequal_sse_wide(const char * p1, const char * p2, size_t size)
{
while (size >= 64)
{
if ( compare_byIntSSE(p1, p2)
&& compare_byIntSSE(p1 + 16, p2 + 16)
&& compare_byIntSSE(p1 + 32, p2 + 32)
&& compare_byIntSSE(p1 + 40, p2 + 40))
{
p1 += 64;
p2 += 64;
size -= 64;
}
else
return false;
}
switch ((size % 64) / 16)
{
case 3: if (!compare_byIntSSE(p1 + 32, p2 + 32)) return false;
case 2: if (!compare_byIntSSE(p1 + 16, p2 + 16)) return false;
case 1: if (!compare_byIntSSE(p1 , p2 )) return false;
}
p1 += (size % 64) / 16 * 16;
p2 += (size % 64) / 16 * 16;
switch (size % 16)
{
case 15: if (p1[14] != p2[14]) return false;
case 14: if (p1[13] != p2[13]) return false;
case 13: if (p1[12] != p2[12]) return false;
case 12: if (reinterpret_cast<const UInt32 *>(p1)[2] == reinterpret_cast<const UInt32 *>(p2)[2]) goto l8; else return false;
case 11: if (p1[10] != p2[10]) return false;
case 10: if (p1[9] != p2[9]) return false;
case 9: if (p1[8] != p2[8]) return false;
l8: case 8: return reinterpret_cast<const UInt64 *>(p1)[0] == reinterpret_cast<const UInt64 *>(p2)[0];
case 7: if (p1[6] != p2[6]) return false;
case 6: if (p1[5] != p2[5]) return false;
case 5: if (p1[4] != p2[4]) return false;
case 4: return reinterpret_cast<const UInt32 *>(p1)[0] == reinterpret_cast<const UInt32 *>(p2)[0];
case 3: if (p1[2] != p2[2]) return false;
case 2: return reinterpret_cast<const UInt16 *>(p1)[0] == reinterpret_cast<const UInt16 *>(p2)[0];
case 1: if (p1[0] != p2[0]) return false;
case 0: break;
}
return true;
}
#define Op(METHOD) \
inline bool operator==(StringRef_Compare16_1_ ## METHOD lhs, StringRef_Compare16_1_ ## METHOD rhs) \
{ \
if (lhs.size != rhs.size) \
return false; \
\
if (lhs.size == 0) \
return true; \
\
return memequal<compare_ ## METHOD>(lhs.data, rhs.data, lhs.size); \
}
Op(byMemcmp)
Op(byUInt64_logicAnd)
Op(byUInt64_bitAnd)
Op(byIntSSE)
Op(byFloatSSE)
inline bool operator==(StringRef_Compare16_1_bySSE4 lhs, StringRef_Compare16_1_bySSE4 rhs)
{
if (lhs.size != rhs.size)
return false;
if (lhs.size == 0)
return true;
return memequal_sse41(lhs.data, rhs.data, lhs.size);
}
inline bool operator==(StringRef_Compare16_1_bySSE4_wide lhs, StringRef_Compare16_1_bySSE4_wide rhs)
{
if (lhs.size != rhs.size)
return false;
if (lhs.size == 0)
return true;
return memequal_sse41_wide(lhs.data, rhs.data, lhs.size);
}
inline bool operator==(StringRef_Compare16_1_bySSE_wide lhs, StringRef_Compare16_1_bySSE_wide rhs)
{
if (lhs.size != rhs.size)
return false;
if (lhs.size == 0)
return true;
return memequal_sse_wide(lhs.data, rhs.data, lhs.size);
}
inline bool operator==(StringRef_CompareAlwaysTrue lhs, StringRef_CompareAlwaysTrue rhs)
{
return true;
}
inline bool operator==(StringRef_CompareAlmostAlwaysTrue lhs, StringRef_CompareAlmostAlwaysTrue rhs)
{
if (lhs.size != rhs.size)
return false;
return true;
}
typedef UInt64 Value;
template <typename Key>
void NO_INLINE bench(const std::vector<StringRef> & data, const char * name)
{
Stopwatch watch;
typedef HashMapWithSavedHash<Key, Value, DefaultHash<Key>> Map;
Map map;
typename Map::iterator it;
bool inserted;
for (size_t i = 0, size = data.size(); i < size; ++i)
{
map.emplace(static_cast<const Key &>(data[i]), it, inserted);
if (inserted)
it->second = 0;
++it->second;
}
watch.stop();
std::cerr << std::fixed << std::setprecision(2)
<< "HashMap (" << name << "). Size: " << map.size()
<< ", elapsed: " << watch.elapsedSeconds()
<< " (" << data.size() / watch.elapsedSeconds() << " elem/sec.)"
#ifdef DBMS_HASH_MAP_COUNT_COLLISIONS
<< ", collisions: " << map.getCollisions()
#endif
<< std::endl;
}
int main(int argc, char ** argv)
{
size_t n = atoi(argv[1]);
size_t m = atoi(argv[2]);
DB::Arena pool;
std::vector<StringRef> data(n);
std::cerr << "sizeof(Key) = " << sizeof(StringRef) << ", sizeof(Value) = " << sizeof(Value) << std::endl;
{
Stopwatch watch;
DB::ReadBufferFromFileDescriptor in1(STDIN_FILENO);
DB::CompressedReadBuffer in2(in1);
std::string tmp;
for (size_t i = 0; i < n && !in2.eof(); ++i)
{
DB::readStringBinary(tmp, in2);
data[i] = StringRef(pool.insert(tmp.data(), tmp.size()), tmp.size());
}
watch.stop();
std::cerr << std::fixed << std::setprecision(2)
<< "Vector. Size: " << n
<< ", elapsed: " << watch.elapsedSeconds()
<< " (" << n / watch.elapsedSeconds() << " elem/sec.)"
<< std::endl;
}
if (!m || m == 1) bench<StringRef_Compare1_Ptrs> (data, "StringRef_Compare1_Ptrs");
if (!m || m == 2) bench<StringRef_Compare1_Index> (data, "StringRef_Compare1_Index");
if (!m || m == 3) bench<StringRef_CompareMemcmp> (data, "StringRef_CompareMemcmp");
if (!m || m == 4) bench<StringRef_Compare8_1_byUInt64> (data, "StringRef_Compare8_1_byUInt64");
if (!m || m == 5) bench<StringRef_Compare16_1_byMemcmp> (data, "StringRef_Compare16_1_byMemcmp");
if (!m || m == 6) bench<StringRef_Compare16_1_byUInt64_logicAnd>(data, "StringRef_Compare16_1_byUInt64_logicAnd");
if (!m || m == 7) bench<StringRef_Compare16_1_byUInt64_bitAnd> (data, "StringRef_Compare16_1_byUInt64_bitAnd");
if (!m || m == 8) bench<StringRef_Compare16_1_byIntSSE> (data, "StringRef_Compare16_1_byIntSSE");
if (!m || m == 9) bench<StringRef_Compare16_1_byFloatSSE> (data, "StringRef_Compare16_1_byFloatSSE");
if (!m || m == 10) bench<StringRef_Compare16_1_bySSE4> (data, "StringRef_Compare16_1_bySSE4");
if (!m || m == 11) bench<StringRef_Compare16_1_bySSE4_wide> (data, "StringRef_Compare16_1_bySSE4_wide");
if (!m || m == 12) bench<StringRef_Compare16_1_bySSE_wide> (data, "StringRef_Compare16_1_bySSE_wide");
if (!m || m == 100) bench<StringRef_CompareAlwaysTrue> (data, "StringRef_CompareAlwaysTrue");
if (!m || m == 101) bench<StringRef_CompareAlmostAlwaysTrue> (data, "StringRef_CompareAlmostAlwaysTrue");
/// 10 > 8, 9
/// 1, 2, 5 - bad
return 0;
}

View File

@ -0,0 +1,431 @@
#include <iostream>
#include <iomanip>
#include <vector>
#include <statdaemons/Stopwatch.h>
#define DBMS_HASH_MAP_COUNT_COLLISIONS
#define DBMS_HASH_MAP_DEBUG_RESIZES
#include <DB/Core/Types.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/Core/StringRef.h>
#include <DB/Common/HashTable/HashMap.h>
#include <DB/Interpreters/AggregationCommon.h>
#include <smmintrin.h>
/** Выполнять так:
for file in MobilePhoneModel PageCharset Params URLDomain UTMSource Referer URL Title; do
for size in 30000 100000 300000 1000000 5000000; do
echo
BEST_METHOD=0
BEST_RESULT=0
for method in {1..5}; do
echo -ne $file $size $method '';
TOTAL_ELEMS=0
for i in {0..1000}; do
TOTAL_ELEMS=$(( $TOTAL_ELEMS + $size ))
if [[ $TOTAL_ELEMS -gt 25000000 ]]; then break; fi
./hash_map_string_3 $size $method < ${file}.bin 2>&1 |
grep HashMap | grep -oE '[0-9\.]+ elem';
done | awk -W interactive '{ if ($1 > x) { x = $1 }; printf(".") } END { print x }' | tee /tmp/hash_map_string_2_res;
CUR_RESULT=$(cat /tmp/hash_map_string_2_res | tr -d '.')
if [[ $CUR_RESULT -gt $BEST_RESULT ]]; then
BEST_METHOD=$method
BEST_RESULT=$CUR_RESULT
fi;
done;
echo Best: $BEST_METHOD - $BEST_RESULT
done;
done
*/
#define DefineStringRef(STRUCT) \
\
struct STRUCT : public StringRef {}; \
\
namespace ZeroTraits \
{ \
template <> \
inline bool check<STRUCT>(STRUCT x) { return nullptr == x.data; } \
\
template <> \
inline void set<STRUCT>(STRUCT & x) { x.data = nullptr; } \
}; \
\
template <> \
struct DefaultHash<STRUCT> \
{ \
size_t operator() (STRUCT x) const \
{ \
return CityHash64(x.data, x.size); \
} \
};
DefineStringRef(StringRef_CompareMemcmp)
DefineStringRef(StringRef_CompareAlwaysTrue)
inline bool operator==(StringRef_CompareMemcmp lhs, StringRef_CompareMemcmp rhs)
{
if (lhs.size != rhs.size)
return false;
if (lhs.size == 0)
return true;
return 0 == memcmp(lhs.data, rhs.data, lhs.size);
}
inline bool operator==(StringRef_CompareAlwaysTrue lhs, StringRef_CompareAlwaysTrue rhs)
{
return true;
}
#define mix(h) ({ \
(h) ^= (h) >> 23; \
(h) *= 0x2127599bf4325c37ULL; \
(h) ^= (h) >> 47; })
struct FastHash64
{
size_t operator() (StringRef x) const
{
const char * buf = x.data;
size_t len = x.size;
const uint64_t m = 0x880355f21e6d1965ULL;
const uint64_t *pos = (const uint64_t *)buf;
const uint64_t *end = pos + (len / 8);
const unsigned char *pos2;
uint64_t h = len * m;
uint64_t v;
while (pos != end) {
v = *pos++;
h ^= mix(v);
h *= m;
}
pos2 = (const unsigned char*)pos;
v = 0;
switch (len & 7) {
case 7: v ^= (uint64_t)pos2[6] << 48;
case 6: v ^= (uint64_t)pos2[5] << 40;
case 5: v ^= (uint64_t)pos2[4] << 32;
case 4: v ^= (uint64_t)pos2[3] << 24;
case 3: v ^= (uint64_t)pos2[2] << 16;
case 2: v ^= (uint64_t)pos2[1] << 8;
case 1: v ^= (uint64_t)pos2[0];
h ^= mix(v);
h *= m;
}
return mix(h);
}
};
struct CrapWow
{
size_t operator() (StringRef x) const
{
const char * key = x.data;
size_t len = x.size;
size_t seed = 0;
const UInt64 m = 0x95b47aa3355ba1a1, n = 0x8a970be7488fda55;
UInt64 hash;
// 3 = m, 4 = n
// r12 = h, r13 = k, ecx = seed, r12 = key
asm(
"leaq (%%rcx,%4), %%r13\n"
"movq %%rdx, %%r14\n"
"movq %%rcx, %%r15\n"
"movq %%rcx, %%r12\n"
"addq %%rax, %%r13\n"
"andq $0xfffffffffffffff0, %%rcx\n"
"jz QW%=\n"
"addq %%rcx, %%r14\n\n"
"negq %%rcx\n"
"XW%=:\n"
"movq %4, %%rax\n"
"mulq (%%r14,%%rcx)\n"
"xorq %%rax, %%r12\n"
"xorq %%rdx, %%r13\n"
"movq %3, %%rax\n"
"mulq 8(%%r14,%%rcx)\n"
"xorq %%rdx, %%r12\n"
"xorq %%rax, %%r13\n"
"addq $16, %%rcx\n"
"jnz XW%=\n"
"QW%=:\n"
"movq %%r15, %%rcx\n"
"andq $8, %%r15\n"
"jz B%=\n"
"movq %4, %%rax\n"
"mulq (%%r14)\n"
"addq $8, %%r14\n"
"xorq %%rax, %%r12\n"
"xorq %%rdx, %%r13\n"
"B%=:\n"
"andq $7, %%rcx\n"
"jz F%=\n"
"movq $1, %%rdx\n"
"shlq $3, %%rcx\n"
"movq %3, %%rax\n"
"shlq %%cl, %%rdx\n"
"addq $-1, %%rdx\n"
"andq (%%r14), %%rdx\n"
"mulq %%rdx\n"
"xorq %%rdx, %%r12\n"
"xorq %%rax, %%r13\n"
"F%=:\n"
"leaq (%%r13,%4), %%rax\n"
"xorq %%r12, %%rax\n"
"mulq %4\n"
"xorq %%rdx, %%rax\n"
"xorq %%r12, %%rax\n"
"xorq %%r13, %%rax\n"
: "=a"(hash), "=c"(key), "=d"(key)
: "r"(m), "r"(n), "a"(seed), "c"(len), "d"(key)
: "%r12", "%r13", "%r14", "%r15", "cc"
);
return hash;
}
};
struct SimpleHash
{
size_t operator() (StringRef x) const
{
const char * pos = x.data;
size_t size = x.size;
const char * end = pos + size;
size_t res = 0;
if (size == 0)
return 0;
if (size < 8)
{
return hashLessThan8(x.data, x.size);
}
while (pos + 8 < end)
{
UInt64 word = *reinterpret_cast<const UInt64 *>(pos);
res = intHash64(word ^ res);
pos += 8;
}
UInt64 word = *reinterpret_cast<const UInt64 *>(end - 8);
res = intHash64(word ^ res);
return res;
}
};
struct VerySimpleHash
{
size_t operator() (StringRef x) const
{
const char * pos = x.data;
size_t size = x.size;
const char * end = pos + size;
size_t res = 0;
if (size == 0)
return 0;
if (size < 8)
{
return hashLessThan8(x.data, x.size);
}
while (pos + 8 < end)
{
res ^= reinterpret_cast<const UInt64 *>(pos)[0];
res ^= res >> 33;
res *= 0xff51afd7ed558ccdULL;
pos += 8;
}
res ^= *reinterpret_cast<const UInt64 *>(end - 8);
res ^= res >> 33;
res *= 0xc4ceb9fe1a85ec53ULL;
res ^= res >> 33;
return res;
}
};
/*struct CRC32Hash
{
size_t operator() (StringRef x) const
{
const char * pos = x.data;
size_t size = x.size;
if (size == 0)
return 0;
if (size < 8)
{
return hashLessThan8(x.data, x.size);
}
const char * end = pos + size;
size_t res = -1ULL;
do
{
UInt64 word = *reinterpret_cast<const UInt64 *>(pos);
res = _mm_crc32_u64(res, word);
pos += 8;
} while (pos + 8 < end);
UInt64 word = *reinterpret_cast<const UInt64 *>(end - 8);
res = _mm_crc32_u64(res, word);
return res;
}
};*/
struct CRC32ILPHash
{
size_t operator() (StringRef x) const
{
const char * pos = x.data;
size_t size = x.size;
if (size == 0)
return 0;
if (size < 16)
{
return hashLessThan16(x.data, x.size);
}
const char * end = pos + size;
const char * end_16 = pos + size / 16 * 16;
size_t res0 = -1ULL;
size_t res1 = -1ULL;
do
{
UInt64 word0 = reinterpret_cast<const UInt64 *>(pos)[0];
UInt64 word1 = reinterpret_cast<const UInt64 *>(pos)[1];
res0 = _mm_crc32_u64(res0, word0);
res1 = _mm_crc32_u64(res1, word1);
pos += 16;
} while (pos < end_16);
UInt64 word0 = *reinterpret_cast<const UInt64 *>(end - 8);
UInt64 word1 = *reinterpret_cast<const UInt64 *>(end - 16);
/* return HashLen16(Rotate(word0 - word1, 43) + Rotate(res0, 30) + res1,
word0 + Rotate(word1 ^ k3, 20) - res0 + size);*/
res0 = _mm_crc32_u64(res0, word0);
res1 = _mm_crc32_u64(res1, word1);
return hashLen16(res0, res1);
}
};
typedef UInt64 Value;
template <typename Key, typename Hash>
void NO_INLINE bench(const std::vector<StringRef> & data, const char * name)
{
Stopwatch watch;
typedef HashMapWithSavedHash<Key, Value, Hash> Map;
Map map;
typename Map::iterator it;
bool inserted;
for (size_t i = 0, size = data.size(); i < size; ++i)
{
map.emplace(static_cast<const Key &>(data[i]), it, inserted);
if (inserted)
it->second = 0;
++it->second;
}
watch.stop();
std::cerr << std::fixed << std::setprecision(2)
<< "HashMap (" << name << "). Size: " << map.size()
<< ", elapsed: " << watch.elapsedSeconds()
<< " (" << data.size() / watch.elapsedSeconds() << " elem/sec.)"
#ifdef DBMS_HASH_MAP_COUNT_COLLISIONS
<< ", collisions: " << map.getCollisions()
#endif
<< std::endl;
}
int main(int argc, char ** argv)
{
size_t n = atoi(argv[1]);
size_t m = atoi(argv[2]);
DB::Arena pool;
std::vector<StringRef> data(n);
std::cerr << "sizeof(Key) = " << sizeof(StringRef) << ", sizeof(Value) = " << sizeof(Value) << std::endl;
{
Stopwatch watch;
DB::ReadBufferFromFileDescriptor in1(STDIN_FILENO);
DB::CompressedReadBuffer in2(in1);
std::string tmp;
for (size_t i = 0; i < n && !in2.eof(); ++i)
{
DB::readStringBinary(tmp, in2);
data[i] = StringRef(pool.insert(tmp.data(), tmp.size()), tmp.size());
}
watch.stop();
std::cerr << std::fixed << std::setprecision(2)
<< "Vector. Size: " << n
<< ", elapsed: " << watch.elapsedSeconds()
<< " (" << n / watch.elapsedSeconds() << " elem/sec.)"
<< std::endl;
}
if (!m || m == 1) bench<StringRef_CompareMemcmp, DefaultHash<StringRef>>(data, "StringRef_CityHash64");
if (!m || m == 2) bench<StringRef_CompareMemcmp, FastHash64> (data, "StringRef_FastHash64");
if (!m || m == 3) bench<StringRef_CompareMemcmp, SimpleHash> (data, "StringRef_SimpleHash");
if (!m || m == 4) bench<StringRef_CompareMemcmp, CrapWow> (data, "StringRef_CrapWow");
if (!m || m == 5) bench<StringRef_CompareMemcmp, CRC32Hash> (data, "StringRef_CRC32Hash");
if (!m || m == 6) bench<StringRef_CompareMemcmp, CRC32ILPHash> (data, "StringRef_CRC32ILPHash");
if (!m || m == 7) bench<StringRef_CompareMemcmp, VerySimpleHash>(data, "StringRef_VerySimpleHash");
return 0;
}

View File

@ -0,0 +1,182 @@
#include <iostream>
#include <iomanip>
#include <vector>
#include <unordered_map>
#include <sparsehash/dense_hash_map>
#include <sparsehash/sparse_hash_map>
#include <statdaemons/Stopwatch.h>
//#define DBMS_HASH_MAP_COUNT_COLLISIONS
#define DBMS_HASH_MAP_DEBUG_RESIZES
#include <DB/Core/Types.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/Core/StringRef.h>
#include <DB/Common/HashTable/HashMap.h>
#include <DB/Interpreters/AggregationCommon.h>
struct SmallStringRef
{
UInt32 size;
union
{
const char * data_big;
char data_small[12];
};
bool isSmall() const { return size <= 12; }
const char * data() const
{
return isSmall() ? data_small : data_big;
}
SmallStringRef(const char * data_, size_t size_)
{
size = size_;
if (isSmall())
memcpy(data_small, data_, size_);
else
data_big = data_;
}
SmallStringRef(const unsigned char * data_, size_t size_) : SmallStringRef(reinterpret_cast<const char *>(data_), size_) {}
SmallStringRef(const std::string & s) : SmallStringRef(s.data(), s.size()) {}
SmallStringRef() {}
std::string toString() const { return std::string(data(), size); }
};
inline bool operator==(SmallStringRef lhs, SmallStringRef rhs)
{
if (lhs.size != rhs.size)
return false;
if (lhs.size == 0)
return true;
return memequalSSE2Wide(lhs.data(), rhs.data(), lhs.size);
}
namespace ZeroTraits
{
template <>
inline bool check<SmallStringRef>(SmallStringRef x) { return x.size == 0; }
template <>
inline void set<SmallStringRef>(SmallStringRef & x) { x.size = 0; }
};
template <>
struct DefaultHash<SmallStringRef>
{
size_t operator() (SmallStringRef x) const
{
return DefaultHash<StringRef>()(StringRef(x.data(), x.size));
}
};
typedef UInt64 Value;
int main(int argc, char ** argv)
{
size_t n = atoi(argv[1]);
size_t m = atoi(argv[2]);
DB::Arena pool;
std::vector<StringRef> data(n);
std::cerr << "sizeof(Key) = " << sizeof(SmallStringRef) << ", sizeof(Value) = " << sizeof(Value) << std::endl;
{
Stopwatch watch;
DB::ReadBufferFromFileDescriptor in1(STDIN_FILENO);
DB::CompressedReadBuffer in2(in1);
std::string tmp;
for (size_t i = 0; i < n && !in2.eof(); ++i)
{
DB::readStringBinary(tmp, in2);
data[i] = StringRef(pool.insert(tmp.data(), tmp.size()), tmp.size());
}
watch.stop();
std::cerr << std::fixed << std::setprecision(2)
<< "Vector. Size: " << n
<< ", elapsed: " << watch.elapsedSeconds()
<< " (" << n / watch.elapsedSeconds() << " elem/sec.)"
<< std::endl;
}
if (!m || m == 1)
{
Stopwatch watch;
typedef HashMapWithSavedHash<StringRef, Value> Map;
Map map;
Map::iterator it;
bool inserted;
for (size_t i = 0; i < n; ++i)
{
map.emplace(data[i], it, inserted);
if (inserted)
it->second = 0;
++it->second;
}
watch.stop();
std::cerr << std::fixed << std::setprecision(2)
<< "HashMap (StringRef). Size: " << map.size()
<< ", elapsed: " << watch.elapsedSeconds()
<< " (" << n / watch.elapsedSeconds() << " elem/sec.)"
#ifdef DBMS_HASH_MAP_COUNT_COLLISIONS
<< ", collisions: " << map.getCollisions()
#endif
<< std::endl;
}
if (!m || m == 2)
{
Stopwatch watch;
typedef HashMapWithSavedHash<SmallStringRef, Value> Map;
Map map;
Map::iterator it;
bool inserted;
for (size_t i = 0; i < n; ++i)
{
map.emplace(SmallStringRef(data[i].data, data[i].size), it, inserted);
if (inserted)
it->second = 0;
++it->second;
}
watch.stop();
std::cerr << std::fixed << std::setprecision(2)
<< "HashMap (SmallStringRef). Size: " << map.size()
<< ", elapsed: " << watch.elapsedSeconds()
<< " (" << n / watch.elapsedSeconds() << " elem/sec.)"
#ifdef DBMS_HASH_MAP_COUNT_COLLISIONS
<< ", collisions: " << map.getCollisions()
#endif
<< std::endl;
}
return 0;
}

View File

@ -1,5 +1,4 @@
#include <Yandex/DateLUT.h>
#include <Yandex/time2str.h>
#include <Poco/DateTimeParser.h>
#include <Poco/AutoPtr.h>
@ -88,9 +87,9 @@ QueryParseResult QueryParser::parse(std::istream & s)
result.CounterID = DB::parse<unsigned>(getValueOfOneTextElement(result.query, CounterID_element_name));
int time_zone_diff = 0;
result.date_first = Time2Date(Poco::DateTimeParser::parse(
result.date_first = DateLUT::instance().toDate(Poco::DateTimeParser::parse(
getValueOfOneTextElement(result.query, date_first_element_name), time_zone_diff).timestamp().epochTime());
result.date_last = Time2Date(Poco::DateTimeParser::parse(
result.date_last = DateLUT::instance().toDate(Poco::DateTimeParser::parse(
getValueOfOneTextElement(result.query, date_last_element_name), time_zone_diff).timestamp().epochTime());
if (result.date_first > result.date_last)
@ -194,7 +193,7 @@ QueryParseResult QueryParser::parse(std::istream & s)
result.limit = DB::parse<unsigned>(limit_nodes->item(0)->innerText());
LOG_DEBUG(log, "CounterID: " << result.CounterID
<< ", dates: " << Date2Str(result.date_first) << " - " << Date2Str(result.date_last));
<< ", dates: " << mysqlxx::Date(result.date_first) << " - " << mysqlxx::Date(result.date_last));
/// получаем список имён атрибутов
Poco::AutoPtr<Poco::XML::NodeList> attributes = result.query->getElementsByTagName("attribute");

View File

@ -127,13 +127,13 @@ void TCPHandler::runImpl()
state.io = executeQuery(state.query, query_context, false, state.stage);
if (state.io.out)
state.is_insert = true;
state.need_receive_data_for_insert = true;
after_check_cancelled.restart();
after_send_progress.restart();
/// Запрос требует приёма данных от клиента?
if (state.is_insert)
if (state.need_receive_data_for_insert)
processInsertQuery(global_settings);
else
processOrdinaryQuery();
@ -566,7 +566,7 @@ bool TCPHandler::receiveData()
{
/// Если запрос на вставку, то данные нужно писать напрямую в state.io.out.
/// Иначе пишем блоки во временную таблицу external_table_name.
if (!state.is_insert)
if (!state.need_receive_data_for_insert)
{
StoragePtr storage;
/// Если такой таблицы не существовало, создаем ее.

View File

@ -45,8 +45,8 @@ struct QueryState
bool is_empty = true;
/// Данные были отправлены.
bool sent_all_data = false;
/// Запрос на вставку или нет.
bool is_insert = false;
/// Запрос требует приёма данных от клиента (INSERT, но не INSERT SELECT).
bool need_receive_data_for_insert = false;
/// Для вывода прогресса - разница после предыдущей отправки прогресса.
Progress progress;

View File

@ -1,7 +1,7 @@
#include <DB/Storages/MergeTree/ActiveDataPartSet.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
#include <Yandex/time2str.h>
namespace DB
{
@ -110,8 +110,8 @@ String ActiveDataPartSet::getPartName(DayNum_t left_date, DayNum_t right_date, U
/// Имя директории для куска иммет вид: YYYYMMDD_YYYYMMDD_N_N_L.
String res;
{
unsigned left_date_id = Date2OrderedIdentifier(date_lut.fromDayNum(left_date));
unsigned right_date_id = Date2OrderedIdentifier(date_lut.fromDayNum(right_date));
unsigned left_date_id = date_lut.toNumYYYYMMDD(left_date);
unsigned right_date_id = date_lut.toNumYYYYMMDD(right_date);
WriteBufferFromString wb(res);
@ -155,8 +155,8 @@ void ActiveDataPartSet::parsePartName(const String & file_name, Part & part, con
DateLUT & date_lut = DateLUT::instance();
part.left_date = date_lut.toDayNum(OrderedIdentifier2Date(file_name.substr(matches[1].offset, matches[1].length)));
part.right_date = date_lut.toDayNum(OrderedIdentifier2Date(file_name.substr(matches[2].offset, matches[2].length)));
part.left_date = date_lut.YYYYMMDDToDayNum(parse<UInt32>(file_name.substr(matches[1].offset, matches[1].length)));
part.right_date = date_lut.YYYYMMDDToDayNum(parse<UInt32>(file_name.substr(matches[2].offset, matches[2].length)));
part.left = parse<UInt64>(file_name.substr(matches[3].offset, matches[3].length));
part.right = parse<UInt64>(file_name.substr(matches[4].offset, matches[4].length));
part.level = parse<UInt32>(file_name.substr(matches[5].offset, matches[5].length));

View File

@ -1,4 +1,3 @@
#include <Yandex/time2str.h>
#include <Poco/Ext/ScopedTry.h>
#include <DB/Storages/MergeTree/MergeTreeData.h>
@ -1164,10 +1163,10 @@ static std::pair<String, DayNum_t> getMonthNameAndDayNum(const Field & partition
throw Exception("Invalid partition format: " + month_name + ". Partition should consist of 6 digits: YYYYMM",
ErrorCodes::INVALID_PARTITION_NAME);
DayNum_t date = DateLUT::instance().toDayNum(OrderedIdentifier2Date(month_name + "01"));
DayNum_t date = DateLUT::instance().YYYYMMDDToDayNum(parse<UInt32>(month_name + "01"));
/// Не можем просто сравнить date с нулем, потому что 0 тоже валидный DayNum.
if (month_name != toString(Date2OrderedIdentifier(DateLUT::instance().fromDayNum(date)) / 100))
if (month_name != toString(DateLUT::instance().toNumYYYYMMDD(date) / 100))
throw Exception("Invalid partition format: " + month_name + " doesn't look like month.",
ErrorCodes::INVALID_PARTITION_NAME);

View File

@ -432,7 +432,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, nullptr, out_transaction);
if (new_data_part->name != merged_name)
LOG_ERROR(log, "Unexpected part name: " << new_data_part->name << " instead of " << merged_name);
throw Exception("Unexpected part name: " + new_data_part->name + " instead of " + merged_name, ErrorCodes::LOGICAL_ERROR);
/// Проверим, что удалились все исходные куски и только они.
if (replaced_parts.size() != parts.size())
@ -443,13 +443,9 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
else
{
for (size_t i = 0; i < parts.size(); ++i)
{
if (parts[i]->name != replaced_parts[i]->name)
{
LOG_ERROR(log, "Unexpected part removed when adding " << new_data_part->name << ": " << replaced_parts[i]->name
<< " instead of " << parts[i]->name);
}
}
throw Exception("Unexpected part removed when adding " + new_data_part->name + ": " + replaced_parts[i]->name
+ " instead of " + parts[i]->name, ErrorCodes::LOGICAL_ERROR);
}
LOG_TRACE(log, "Merged " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name);

View File

@ -52,13 +52,27 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
bool skip_sanity_checks = false;
if (zookeeper && zookeeper->exists(replica_path + "/flags/force_restore_data"))
try
{
skip_sanity_checks = true;
zookeeper->remove(replica_path + "/flags/force_restore_data");
if (zookeeper && zookeeper->exists(replica_path + "/flags/force_restore_data"))
{
skip_sanity_checks = true;
zookeeper->remove(replica_path + "/flags/force_restore_data");
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag "
<< replica_path << "/flags/force_restore_data).");
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag "
<< replica_path << "/flags/force_restore_data).");
}
}
catch (const zkutil::KeeperException & e)
{
/// Не удалось соединиться с ZK (об этом стало известно при попытке выполнить первую операцию).
if (e.code == ZCONNECTIONLOSS)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
zookeeper = nullptr;
}
else
throw;
}
data.loadDataParts(skip_sanity_checks);
@ -592,8 +606,10 @@ void StorageReplicatedMergeTree::loadQueue()
std::sort(children.begin(), children.end());
for (const String & child : children)
{
String s = zookeeper->get(replica_path + "/queue/" + child);
zkutil::Stat stat;
String s = zookeeper->get(replica_path + "/queue/" + child, &stat);
LogEntryPtr entry = LogEntry::parse(s);
entry->create_time = stat.ctime / 1000;
entry->znode_name = child;
entry->addResultToVirtualParts(*this);
queue.push_back(entry);
@ -625,12 +641,14 @@ void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_ev
size_t count = 0;
String entry_str;
while (zookeeper->tryGet(zookeeper_path + "/log/log-" + padIndex(index), entry_str))
zkutil::Stat stat;
while (zookeeper->tryGet(zookeeper_path + "/log/log-" + padIndex(index), entry_str, &stat))
{
++count;
++index;
LogEntryPtr entry = LogEntry::parse(entry_str);
entry->create_time = stat.ctime / 1000;
/// Одновременно добавим запись в очередь и продвинем указатель на лог.
zkutil::Ops ops;
@ -1510,6 +1528,7 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
LogEntryPtr log_entry = new LogEntry;
log_entry->type = LogEntry::GET_PART;
log_entry->create_time = time(0);
log_entry->source_replica = "";
log_entry->new_part_name = part_name;
@ -2065,7 +2084,7 @@ static String getFakePartNameForDrop(const String & month_name, UInt64 left, UIn
{
/// Диапазон дат - весь месяц.
DateLUT & lut = DateLUT::instance();
time_t start_time = OrderedIdentifier2Date(month_name + "01");
time_t start_time = DateLUT::instance().YYYYMMDDToDate(parse<UInt32>(month_name + "01"));
DayNum_t left_date = lut.toDayNum(start_time);
DayNum_t right_date = DayNum_t(static_cast<size_t>(left_date) + lut.daysInMonth(start_time) - 1);
@ -2374,7 +2393,7 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
{
res.is_leader = is_leader_node;
res.is_readonly = is_read_only;
res.is_session_expired = zookeeper->expired();
res.is_session_expired = !zookeeper || zookeeper->expired();
{
std::lock_guard<std::mutex> lock(queue_mutex);
@ -2383,6 +2402,7 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
res.inserts_in_queue = 0;
res.merges_in_queue = 0;
res.queue_oldest_time = 0;
for (const LogEntryPtr & entry : queue)
{
@ -2390,6 +2410,9 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
++res.inserts_in_queue;
if (entry->type == LogEntry::MERGE_PARTS)
++res.merges_in_queue;
if (entry->create_time && (!res.queue_oldest_time || entry->create_time < res.queue_oldest_time))
res.queue_oldest_time = entry->create_time;
}
}

View File

@ -1,6 +1,7 @@
#include <DB/Columns/ColumnString.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Storages/StorageSystemReplicas.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
@ -29,6 +30,7 @@ StorageSystemReplicas::StorageSystemReplicas(const std::string & name_, const Co
{ "queue_size", new DataTypeUInt32 },
{ "inserts_in_queue", new DataTypeUInt32 },
{ "merges_in_queue", new DataTypeUInt32 },
{ "queue_oldest_time", new DataTypeDateTime},
{ "log_max_index", new DataTypeUInt64 },
{ "log_pointer", new DataTypeUInt64 },
{ "total_replicas", new DataTypeUInt8 },
@ -115,6 +117,7 @@ BlockInputStreams StorageSystemReplicas::read(
ColumnWithNameAndType col_queue_size { new ColumnUInt32, new DataTypeUInt32, "queue_size"};
ColumnWithNameAndType col_inserts_in_queue { new ColumnUInt32, new DataTypeUInt32, "inserts_in_queue"};
ColumnWithNameAndType col_merges_in_queue { new ColumnUInt32, new DataTypeUInt32, "merges_in_queue"};
ColumnWithNameAndType col_queue_oldest_time { new ColumnUInt32, new DataTypeDateTime, "queue_oldest_time"};
ColumnWithNameAndType col_log_max_index { new ColumnUInt64, new DataTypeUInt64, "log_max_index"};
ColumnWithNameAndType col_log_pointer { new ColumnUInt64, new DataTypeUInt64, "log_pointer"};
ColumnWithNameAndType col_total_replicas { new ColumnUInt8, new DataTypeUInt8, "total_replicas"};
@ -140,6 +143,7 @@ BlockInputStreams StorageSystemReplicas::read(
col_queue_size .column->insert(UInt64(status.queue_size));
col_inserts_in_queue .column->insert(UInt64(status.inserts_in_queue));
col_merges_in_queue .column->insert(UInt64(status.merges_in_queue));
col_queue_oldest_time .column->insert(UInt64(status.queue_oldest_time));
col_log_max_index .column->insert(status.log_max_index);
col_log_pointer .column->insert(status.log_pointer);
col_total_replicas .column->insert(UInt64(status.total_replicas));
@ -162,6 +166,7 @@ BlockInputStreams StorageSystemReplicas::read(
col_queue_size,
col_inserts_in_queue,
col_merges_in_queue,
col_queue_oldest_time,
col_log_max_index,
col_log_pointer,
col_total_replicas,

View File

@ -0,0 +1,20 @@
#include <DB/Storages/MergeTree/ActiveDataPartSet.h>
#include <Yandex/time2str.h>
#include <mysqlxx/DateTime.h>
int main(int argc, char ** argv)
{
DayNum_t today = DateLUT::instance().toDayNum(time(0));
for (DayNum_t date = today; DayNum_t(date + 10) > today; --date)
{
std::string name = DB::ActiveDataPartSet::getPartName(date, date, 0, 0, 0);
std::cerr << name << '\n';
time_t time = OrderedIdentifier2Date(name);
std::cerr << mysqlxx::DateTime(time) << '\n';
}
return 0;
}

View File

@ -0,0 +1,14 @@
SET max_block_size = 1000;
SELECT number FROM
(
SELECT * FROM system.numbers LIMIT 10000
)
WHERE (number, number * 2) IN
(
SELECT number, number * 2
FROM system.numbers
WHERE number % 1000 = 1
LIMIT 2
)
LIMIT 2;

View File

@ -0,0 +1,481 @@
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1

View File

@ -0,0 +1,185 @@
select '{ key: fn, value: concat }' == concat('{ key: ', toFixedString('fn', 2), ', value: ', 'concat', ' }');
select concat('a', 'b') == 'ab';
select concat('a', materialize('b')) == 'ab';
select concat(materialize('a'), 'b') == 'ab';
select concat(materialize('a'), materialize('b')) == 'ab';
select concat('a', toFixedString('b', 1)) == 'ab';
select concat('a', materialize(toFixedString('b', 1))) == 'ab';
select concat(materialize('a'), toFixedString('b', 1)) == 'ab';
select concat(materialize('a'), materialize(toFixedString('b', 1))) == 'ab';
select concat(toFixedString('a', 1), 'b') == 'ab';
select concat(toFixedString('a', 1), materialize('b')) == 'ab';
select concat(materialize(toFixedString('a', 1)), 'b') == 'ab';
select concat(materialize(toFixedString('a', 1)), materialize('b')) == 'ab';
select concat(toFixedString('a', 1), toFixedString('b', 1)) == 'ab';
select concat(toFixedString('a', 1), materialize(toFixedString('b', 1))) == 'ab';
select concat(materialize(toFixedString('a', 1)), toFixedString('b', 1)) == 'ab';
select concat(materialize(toFixedString('a', 1)), materialize(toFixedString('b', 1))) == 'ab';
select concat('a', 'b') == 'ab' from system.numbers limit 5;
select concat('a', materialize('b')) == 'ab' from system.numbers limit 5;
select concat(materialize('a'), 'b') == 'ab' from system.numbers limit 5;
select concat(materialize('a'), materialize('b')) == 'ab' from system.numbers limit 5;
select concat('a', toFixedString('b', 1)) == 'ab' from system.numbers limit 5;
select concat('a', materialize(toFixedString('b', 1))) == 'ab' from system.numbers limit 5;
select concat(materialize('a'), toFixedString('b', 1)) == 'ab' from system.numbers limit 5;
select concat(materialize('a'), materialize(toFixedString('b', 1))) == 'ab' from system.numbers limit 5;
select concat(toFixedString('a', 1), 'b') == 'ab' from system.numbers limit 5;
select concat(toFixedString('a', 1), materialize('b')) == 'ab' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), 'b') == 'ab' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), materialize('b')) == 'ab' from system.numbers limit 5;
select concat(toFixedString('a', 1), toFixedString('b', 1)) == 'ab' from system.numbers limit 5;
select concat(toFixedString('a', 1), materialize(toFixedString('b', 1))) == 'ab' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), toFixedString('b', 1)) == 'ab' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), materialize(toFixedString('b', 1))) == 'ab' from system.numbers limit 5;
select concat('a', 'b', 'c') == 'abc';
select concat('a', 'b', materialize('c')) == 'abc';
select concat('a', materialize('b'), 'c') == 'abc';
select concat('a', materialize('b'), materialize('c')) == 'abc';
select concat(materialize('a'), 'b', 'c') == 'abc';
select concat(materialize('a'), 'b', materialize('c')) == 'abc';
select concat(materialize('a'), materialize('b'), 'c') == 'abc';
select concat(materialize('a'), materialize('b'), materialize('c')) == 'abc';
select concat('a', 'b', toFixedString('c', 1)) == 'abc';
select concat('a', 'b', materialize(toFixedString('c', 1))) == 'abc';
select concat('a', materialize('b'), toFixedString('c', 1)) == 'abc';
select concat('a', materialize('b'), materialize(toFixedString('c', 1))) == 'abc';
select concat(materialize('a'), 'b', toFixedString('c', 1)) == 'abc';
select concat(materialize('a'), 'b', materialize(toFixedString('c', 1))) == 'abc';
select concat(materialize('a'), materialize('b'), toFixedString('c', 1)) == 'abc';
select concat(materialize('a'), materialize('b'), materialize(toFixedString('c', 1))) == 'abc';
select concat('a', toFixedString('b', 1), 'c') == 'abc';
select concat('a', toFixedString('b', 1), materialize('c')) == 'abc';
select concat('a', materialize(toFixedString('b', 1)), 'c') == 'abc';
select concat('a', materialize(toFixedString('b', 1)), materialize('c')) == 'abc';
select concat(materialize('a'), toFixedString('b', 1), 'c') == 'abc';
select concat(materialize('a'), toFixedString('b', 1), materialize('c')) == 'abc';
select concat(materialize('a'), materialize(toFixedString('b', 1)), 'c') == 'abc';
select concat(materialize('a'), materialize(toFixedString('b', 1)), materialize('c')) == 'abc';
select concat('a', toFixedString('b', 1), toFixedString('c', 1)) == 'abc';
select concat('a', toFixedString('b', 1), materialize(toFixedString('c', 1))) == 'abc';
select concat('a', materialize(toFixedString('b', 1)), toFixedString('c', 1)) == 'abc';
select concat('a', materialize(toFixedString('b', 1)), materialize(toFixedString('c', 1))) == 'abc';
select concat(materialize('a'), toFixedString('b', 1), toFixedString('c', 1)) == 'abc';
select concat(materialize('a'), toFixedString('b', 1), materialize(toFixedString('c', 1))) == 'abc';
select concat(materialize('a'), materialize(toFixedString('b', 1)), toFixedString('c', 1)) == 'abc';
select concat(materialize('a'), materialize(toFixedString('b', 1)), materialize(toFixedString('c', 1))) == 'abc';
select concat(toFixedString('a', 1), 'b', 'c') == 'abc';
select concat(toFixedString('a', 1), 'b', materialize('c')) == 'abc';
select concat(toFixedString('a', 1), materialize('b'), 'c') == 'abc';
select concat(toFixedString('a', 1), materialize('b'), materialize('c')) == 'abc';
select concat(materialize(toFixedString('a', 1)), 'b', 'c') == 'abc';
select concat(materialize(toFixedString('a', 1)), 'b', materialize('c')) == 'abc';
select concat(materialize(toFixedString('a', 1)), materialize('b'), 'c') == 'abc';
select concat(materialize(toFixedString('a', 1)), materialize('b'), materialize('c')) == 'abc';
select concat(toFixedString('a', 1), 'b', toFixedString('c', 1)) == 'abc';
select concat(toFixedString('a', 1), 'b', materialize(toFixedString('c', 1))) == 'abc';
select concat(toFixedString('a', 1), materialize('b'), toFixedString('c', 1)) == 'abc';
select concat(toFixedString('a', 1), materialize('b'), materialize(toFixedString('c', 1))) == 'abc';
select concat(materialize(toFixedString('a', 1)), 'b', toFixedString('c', 1)) == 'abc';
select concat(materialize(toFixedString('a', 1)), 'b', materialize(toFixedString('c', 1))) == 'abc';
select concat(materialize(toFixedString('a', 1)), materialize('b'), toFixedString('c', 1)) == 'abc';
select concat(materialize(toFixedString('a', 1)), materialize('b'), materialize(toFixedString('c', 1))) == 'abc';
select concat(toFixedString('a', 1), toFixedString('b', 1), 'c') == 'abc';
select concat(toFixedString('a', 1), toFixedString('b', 1), materialize('c')) == 'abc';
select concat(toFixedString('a', 1), materialize(toFixedString('b', 1)), 'c') == 'abc';
select concat(toFixedString('a', 1), materialize(toFixedString('b', 1)), materialize('c')) == 'abc';
select concat(materialize(toFixedString('a', 1)), toFixedString('b', 1), 'c') == 'abc';
select concat(materialize(toFixedString('a', 1)), toFixedString('b', 1), materialize('c')) == 'abc';
select concat(materialize(toFixedString('a', 1)), materialize(toFixedString('b', 1)), 'c') == 'abc';
select concat(materialize(toFixedString('a', 1)), materialize(toFixedString('b', 1)), materialize('c')) == 'abc';
select concat(toFixedString('a', 1), toFixedString('b', 1), toFixedString('c', 1)) == 'abc';
select concat(toFixedString('a', 1), toFixedString('b', 1), materialize(toFixedString('c', 1))) == 'abc';
select concat(toFixedString('a', 1), materialize(toFixedString('b', 1)), toFixedString('c', 1)) == 'abc';
select concat(toFixedString('a', 1), materialize(toFixedString('b', 1)), materialize(toFixedString('c', 1))) == 'abc';
select concat(materialize(toFixedString('a', 1)), toFixedString('b', 1), toFixedString('c', 1)) == 'abc';
select concat(materialize(toFixedString('a', 1)), toFixedString('b', 1), materialize(toFixedString('c', 1))) == 'abc';
select concat(materialize(toFixedString('a', 1)), materialize(toFixedString('b', 1)), toFixedString('c', 1)) == 'abc';
select concat(materialize(toFixedString('a', 1)), materialize(toFixedString('b', 1)), materialize(toFixedString('c', 1))) == 'abc';
select concat('a', 'b', 'c') == 'abc' from system.numbers limit 5;
select concat('a', 'b', materialize('c')) == 'abc' from system.numbers limit 5;
select concat('a', materialize('b'), 'c') == 'abc' from system.numbers limit 5;
select concat('a', materialize('b'), materialize('c')) == 'abc' from system.numbers limit 5;
select concat(materialize('a'), 'b', 'c') == 'abc' from system.numbers limit 5;
select concat(materialize('a'), 'b', materialize('c')) == 'abc' from system.numbers limit 5;
select concat(materialize('a'), materialize('b'), 'c') == 'abc' from system.numbers limit 5;
select concat(materialize('a'), materialize('b'), materialize('c')) == 'abc' from system.numbers limit 5;
select concat('a', 'b', toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat('a', 'b', materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat('a', materialize('b'), toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat('a', materialize('b'), materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat(materialize('a'), 'b', toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat(materialize('a'), 'b', materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat(materialize('a'), materialize('b'), toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat(materialize('a'), materialize('b'), materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat('a', toFixedString('b', 1), 'c') == 'abc' from system.numbers limit 5;
select concat('a', toFixedString('b', 1), materialize('c')) == 'abc' from system.numbers limit 5;
select concat('a', materialize(toFixedString('b', 1)), 'c') == 'abc' from system.numbers limit 5;
select concat('a', materialize(toFixedString('b', 1)), materialize('c')) == 'abc' from system.numbers limit 5;
select concat(materialize('a'), toFixedString('b', 1), 'c') == 'abc' from system.numbers limit 5;
select concat(materialize('a'), toFixedString('b', 1), materialize('c')) == 'abc' from system.numbers limit 5;
select concat(materialize('a'), materialize(toFixedString('b', 1)), 'c') == 'abc' from system.numbers limit 5;
select concat(materialize('a'), materialize(toFixedString('b', 1)), materialize('c')) == 'abc' from system.numbers limit 5;
select concat('a', toFixedString('b', 1), toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat('a', toFixedString('b', 1), materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat('a', materialize(toFixedString('b', 1)), toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat('a', materialize(toFixedString('b', 1)), materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat(materialize('a'), toFixedString('b', 1), toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat(materialize('a'), toFixedString('b', 1), materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat(materialize('a'), materialize(toFixedString('b', 1)), toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat(materialize('a'), materialize(toFixedString('b', 1)), materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), 'b', 'c') == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), 'b', materialize('c')) == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), materialize('b'), 'c') == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), materialize('b'), materialize('c')) == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), 'b', 'c') == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), 'b', materialize('c')) == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), materialize('b'), 'c') == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), materialize('b'), materialize('c')) == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), 'b', toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), 'b', materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), materialize('b'), toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), materialize('b'), materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), 'b', toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), 'b', materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), materialize('b'), toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), materialize('b'), materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), toFixedString('b', 1), 'c') == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), toFixedString('b', 1), materialize('c')) == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), materialize(toFixedString('b', 1)), 'c') == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), materialize(toFixedString('b', 1)), materialize('c')) == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), toFixedString('b', 1), 'c') == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), toFixedString('b', 1), materialize('c')) == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), materialize(toFixedString('b', 1)), 'c') == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), materialize(toFixedString('b', 1)), materialize('c')) == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), toFixedString('b', 1), toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), toFixedString('b', 1), materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), materialize(toFixedString('b', 1)), toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat(toFixedString('a', 1), materialize(toFixedString('b', 1)), materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), toFixedString('b', 1), toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), toFixedString('b', 1), materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), materialize(toFixedString('b', 1)), toFixedString('c', 1)) == 'abc' from system.numbers limit 5;
select concat(materialize(toFixedString('a', 1)), materialize(toFixedString('b', 1)), materialize(toFixedString('c', 1))) == 'abc' from system.numbers limit 5;

View File

@ -1,40 +1,49 @@
#include <Yandex/time2str.h>
#include <cstring>
#include <Yandex/DateLUT.h>
#include <Poco/Exception.h>
DateLUT::DateLUT()
{
/** Дополнительный вызов Time2Date для случая, когда в 1981-1984 году в России,
* 1 апреля начиналось в час ночи, не в полночь.
*/
size_t i = 0;
for (time_t t = Time2Date(DATE_LUT_MIN);
t <= DATE_LUT_MAX;
t = Time2Date(TimeDayShift(t)))
time_t start_of_day = DATE_LUT_MIN;
do
{
if (i > DATE_LUT_MAX_DAY_NUM)
throw Poco::Exception("Cannot create DateLUT: i > DATE_LUT_MAX_DAY_NUM.");
tm time_descr;
localtime_r(&start_of_day, &time_descr);
time_descr.tm_hour = 0;
time_descr.tm_min = 0;
time_descr.tm_sec = 0;
time_descr.tm_isdst = -1;
start_of_day = mktime(&time_descr);
Values & values = lut[i];
struct tm tm;
localtime_r(&t, &tm);
values.year = time_descr.tm_year + 1900;
values.month = time_descr.tm_mon + 1;
values.day_of_week = time_descr.tm_wday == 0 ? 7 : time_descr.tm_wday;
values.day_of_month = time_descr.tm_mday;
values.year = tm.tm_year + 1900;
values.month = tm.tm_mon + 1;
values.day_of_week = tm.tm_wday == 0 ? 7 : tm.tm_wday;
values.day_of_month = tm.tm_mday;
values.date = start_of_day;
tm.tm_hour = 0;
tm.tm_min = 0;
tm.tm_sec = 0;
tm.tm_isdst = -1;
/// Переходим на следующий день.
++time_descr.tm_mday;
values.date = mktime(&tm);
/** Обратите внимание, что в 1981-1984 году в России,
* 1 апреля начиналось в час ночи, а не в полночь.
* Если здесь оставить час равным нулю, то прибавление единицы к дню, привело бы к 23 часам того же дня.
*/
time_descr.tm_hour = 12;
start_of_day = mktime(&time_descr);
++i;
}
} while (start_of_day <= DATE_LUT_MAX);
/// Заполняем lookup таблицу для годов
memset(years_lut, 0, DATE_LUT_YEARS * sizeof(years_lut[0]));

View File

@ -20,6 +20,7 @@ int main(int argc, char ** argv)
{
loop(OrderedIdentifier2Date(20101031), OrderedIdentifier2Date(20101101), 15 * 60);
loop(OrderedIdentifier2Date(20100328), OrderedIdentifier2Date(20100330), 15 * 60);
loop(OrderedIdentifier2Date(20141020), OrderedIdentifier2Date(20141106), 15 * 60);
return 0;
}

View File

@ -0,0 +1,8 @@
#include <Yandex/DateLUT.h>
/// Позволяет проверить время инициализации DateLUT.
int main(int argc, char ** argv)
{
DateLUT::instance();
return 0;
}

View File

@ -1,6 +1,5 @@
#include <iostream>
#include <mysqlxx/mysqlxx.h>
#include <Yandex/time2str.h>
int main(int argc, char ** argv)
@ -23,8 +22,8 @@ int main(int argc, char ** argv)
std::cerr << row[1] << ", " << row["d"]
<< ", " << row[1].getDate()
<< ", " << row[1].getDateTime()
<< ", " << Date2Str(row[1].getDate())
<< ", " << Time2Str(row[1].getDateTime())
<< ", " << row[1].getDate()
<< ", " << row[1].getDateTime()
<< std::endl
<< mysqlxx::escape << row[1].getDate() << ", " << mysqlxx::escape << row[1].getDateTime() << std::endl
<< mysqlxx::quote << row[1].getDate() << ", " << mysqlxx::quote << row[1].getDateTime() << std::endl

View File

@ -18,6 +18,6 @@ add_custom_target(${daemonname}-logrotate.target ALL
INSTALL(
FILES ${CMAKE_CURRENT_BINARY_DIR}/${daemonname}.logrt
DESTINATION /etc/logrotate.d
PERMISSIONS OWNER_EXECUTE OWNER_READ GROUP_EXECUTE GROUP_READ WORLD_EXECUTE WORLD_READ
PERMISSIONS OWNER_READ OWNER_WRITE GROUP_READ WORLD_READ
)
endmacro (CREATE_LOGROTATE)