clickhouse: added Chunks storage [#CONV-6705].

This commit is contained in:
Michael Kolupaev 2013-02-07 13:03:19 +00:00
parent d24d0f7f78
commit 6f2c6559c8
8 changed files with 254 additions and 13 deletions

View File

@ -181,6 +181,9 @@ namespace ErrorCodes
CANNOT_CREATE_DIRECTORY,
CANNOT_ALLOCATE_MEMORY,
CYCLIC_ALIASES,
NEGATIVE_REFCOUNT,
CHUNK_NOT_FOUND,
DUPLICATE_CHUNK_NAME,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -0,0 +1,71 @@
#pragma once
#include <DB/Storages/StorageLog.h>
#include <DB/Interpreters/Context.h>
#include <statdaemons/CounterInFile.h>
namespace DB
{
/** Хранит несколько кусков данных. Читает из всех кусков.
* Запись не поддерживается. Для записи используются таблицы типа ChunkMerger.
* Таблицы типа ChunkRef могут ссылаться на отдельные куски внутри таблицы типа Chunks.
* Хранит количество ссылающихся таблиц ChunkRef и удаляет себя, когда оно становится нулевым.
*/
class StorageChunks : public StorageLog
{
public:
static StoragePtr create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, Context & context);
void addReference();
void removeReference();
std::string getName() const { return "Chunks"; }
BlockInputStreams readFromChunk(
const std::string & chunk_name,
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
BlockOutputStreamPtr writeToNewChunk(
const std::string & chunk_name);
/// Если бы запись была разрешена, непонятно, как назвать новый чанк.
BlockOutputStreamPtr write(
ASTPtr query)
{
throw Exception("Table doesn't support writing", ErrorCodes::NOT_IMPLEMENTED);
}
/// Переименование испортило бы целостность количества ссылок из таблиц ChunkRef.
void rename(const String & new_path_to_db, const String & new_name)
{
throw Exception("Table doesn't support renaming", ErrorCodes::NOT_IMPLEMENTED);
}
private:
typedef std::vector<size_t> Marks;
typedef std::map<String, size_t> ChunkIndices;
String database_name;
bool index_loaded;
Marks marks;
ChunkIndices chunk_indices;
CounterInFile reference_counter;
Context context;
StorageChunks(const std::string & path_, const std::string & name_, const std::string & database_name_, NamesAndTypesListPtr columns_, Context & context_);
void dropThis();
void loadIndex();
void appendChunkToIndex(const std::string & name, size_t mark);
};
}

View File

@ -18,6 +18,7 @@ public:
const String & name,
const String & data_path,
const String & table_name,
const String & database_name,
Context & context,
ASTPtr & query,
NamesAndTypesListPtr columns,

View File

@ -140,11 +140,29 @@ public:
void rename(const String & new_path_to_db, const String & new_name);
private:
protected:
String path;
String name;
NamesAndTypesListPtr columns;
Poco::RWLock rwlock;
StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_);
void loadMarks();
size_t marksCount();
BlockInputStreams read(
size_t from_mark,
size_t to_mark,
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
private:
/// Данные столбца
struct ColumnData
{
@ -154,10 +172,6 @@ private:
};
typedef std::map<String, ColumnData> Files_t;
Files_t files;
Poco::RWLock rwlock;
StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_);
void addFile(const String & column_name, const IDataType & type, size_t level = 0);
@ -165,7 +179,6 @@ private:
* Делается лениво, чтобы при большом количестве таблиц, сервер быстро стартовал.
*/
bool loaded_marks;
void loadMarks();
};
}

View File

@ -162,7 +162,7 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists)
else
throw Exception("Incorrect CREATE query: required ENGINE.", ErrorCodes::ENGINE_REQUIRED);
res = context.getStorageFactory().get(storage_name, data_path, table_name, context, query_ptr, columns, create.attach);
res = context.getStorageFactory().get(storage_name, data_path, table_name, database_name, context, query_ptr, columns, create.attach);
/// Проверка наличия метаданных таблицы на диске и создание метаданных

View File

@ -0,0 +1,123 @@
#include <DB/Storages/StorageChunks.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Interpreters/InterpreterDropQuery.h>
#include <DB/Parsers/ASTDropQuery.h>
namespace DB
{
StoragePtr StorageChunks::create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, Context & context_)
{
return (new StorageChunks(path_, name_, columns_, context_))->thisPtr();
}
void StorageChunks::addReference()
{
reference_counter.add(1, true);
}
void StorageChunks::removeReference()
{
Int64 c = reference_counter.add(-1, false);
if (c < 0)
throw Exception("Negative refcount on table " + getName(), ErrorCodes::NEGATIVE_REFCOUNT);
if (c == 0)
dropThis();
}
BlockInputStreams StorageChunks::readFromChunk(
const std::string & chunk_name,
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
{
loadIndex();
size_t mark1;
size_t mark2;
{
Poco::ScopedReadRWLock lock(rwlock);
if (!chunk_indices.count(chunk_name))
throw Exception("No chunk " + chunk_name + " in table " + getName(), ErrorCodes::CHUNK_NOT_FOUND);
size_t index = chunk_indices[chunk_name];
mark1 = marks[index];
mark2 = index + 1 == marks.size() ? marksCount() : marks[index + 1];
}
return read(mark1, mark2, column_names, query, settings, processed_stage, max_block_size, threads);
}
BlockOutputStreamPtr StorageChunks::writeToNewChunk(
const std::string & chunk_name)
{
loadIndex();
{
Poco::ScopedWriteRWLock lock(rwlock);
if (chunk_indices.count(chunk_name))
throw Exception("Duplicate chunk name in table " + getName(), ErrorCodes::DUPLICATE_CHUNK_NAME);
size_t mark = marksCount();
chunk_indices[chunk_name] = marks.size();
appendChunkToIndex(chunk_name, mark);
marks.push_back(mark);
}
return StorageLog::write(this, NULL);
}
StorageChunks(const std::string& path_, const std::string& name_, const std::string & database_name_, NamesAndTypesListPtr columns_, Context & context_)
: StorageLog(path_, name_, columns_), database_name(database_name_), index_loaded(false), reference_counter(path_ + escapeForFileName(name_) + "/refcount.txt"), context(context_) {}
void StorageChunks::loadIndex()
{
loadMarks();
Poco::ScopedWriteRWLock lock(rwlock);
if (index_loaded)
return;
index_loaded = true;
String index_path = path + escapeForFileName(name) + "/chunks.chn";
ReadBufferFromFile index(index_path, 4096);
while (!index.eof())
{
String name;
size_t mark;
readStringBinary(name, index);
readIntBinary<UInt64>(mark);
chunk_indices[name] = marks.size();
marks.push_back(mark);
}
}
void StorageChunks::appendChunkToIndex(const std::string & name, size_t mark)
{
String index_path = path + escapeForFileName(name) + "/chunks.chn";
WriteBufferFromFile index(index_path, 4096, O_APPEND | O_CREAT | O_WRONLY);
writeStringBinary(name, index);
writeIntBinary<UInt64>(mark, index);
}
void StorageChunks::dropThis()
{
ASTDropQuery * query = new ASTDropQuery();
ASTPtr query_ptr = query;
query->detach = false;
query->if_exists = false;
query->database = database_name;
query->table = name;
InterpreterDropQuery interpreter(query_ptr, context);
interpreter.execute();
}

View File

@ -16,6 +16,7 @@
#include <DB/Storages/StorageSystemNumbers.h>
#include <DB/Storages/StorageSystemOne.h>
#include <DB/Storages/StorageFactory.h>
#include <DB/Storages/StorageChunks.h>
namespace DB
@ -26,6 +27,7 @@ StoragePtr StorageFactory::get(
const String & name,
const String & data_path,
const String & table_name,
const String & database_name,
Context & context,
ASTPtr & query,
NamesAndTypesListPtr columns,
@ -35,6 +37,10 @@ StoragePtr StorageFactory::get(
{
return StorageLog::create(data_path, table_name, columns);
}
else if (name == "Chunks")
{
return StorageChunks::create(data_path, table_name, database_name, columns, context);
}
else if (name == "TinyLog")
{
return StorageTinyLog::create(data_path, table_name, columns, attach);

View File

@ -286,6 +286,12 @@ void StorageLog::loadMarks()
}
size_t StorageLog::marksCount()
{
return files.begin()->second.marks.size();
}
void StorageLog::rename(const String & new_path_to_db, const String & new_name)
{
Poco::ScopedWriteRWLock lock(rwlock);
@ -305,6 +311,8 @@ void StorageLog::rename(const String & new_path_to_db, const String & new_name)
BlockInputStreams StorageLog::read(
size_t from_mark,
size_t to_mark,
const Names & column_names,
ASTPtr query,
const Settings & settings,
@ -321,8 +329,11 @@ BlockInputStreams StorageLog::read(
const Marks & marks = files.begin()->second.marks;
size_t marks_size = marks.size();
if (threads > marks_size)
threads = marks_size;
if (to_mark > marks_size || to_mark < from_mark)
throw Exception("Marks out of range in StorageLog::read", ErrorCodes::LOGICAL_ERROR);
if (threads > to_mark - from_mark)
threads = to_mark - from_mark;
BlockInputStreams res;
@ -332,15 +343,28 @@ BlockInputStreams StorageLog::read(
max_block_size,
column_names,
thisPtr(),
thread * marks_size / threads,
thread == 0
? marks[marks_size / threads - 1].rows
: (marks[(thread + 1) * marks_size / threads - 1].rows - marks[thread * marks_size / threads - 1].rows)));
from_mark + thread * (to_mark - from_mark) / threads,
marks[from_mark + (thread + 1) * (to_mark - from_mark) / threads - 1].rows -
(thread == 0
? 0
: marks[from_mark + thread * (to_mark - from_mark) / threads - 1].rows)));
}
return res;
}
BlockInputStreams StorageLog::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
{
return read(0, marksCount(), column_names, query, settings, processed_stage, max_block_size, threads);
}
BlockOutputStreamPtr StorageLog::write(
ASTPtr query)