dbms: development.

This commit is contained in:
Alexey Milovidov 2011-10-31 17:55:06 +00:00
parent 169ef6bf2e
commit 67b02812f6
9 changed files with 152 additions and 20 deletions

View File

@ -1,5 +1,4 @@
#ifndef DBMS_CORE_NAMES_AND_TYPES_H
#define DBMS_CORE_NAMES_AND_TYPES_H
#pragma once
#include <map>
#include <string>
@ -15,7 +14,6 @@ namespace DB
using Poco::SharedPtr;
typedef std::map<std::string, DataTypePtr> NamesAndTypes;
typedef SharedPtr<NamesAndTypes> NamesAndTypesPtr;
}
#endif

View File

@ -76,7 +76,7 @@ public:
/** ALTER таблицы в виде изменения столбцов, не затрагивающий изменение Storage или его параметров.
* (ALTER, затрагивающий изменение движка, делается внешним кодом, путём копирования данных.)
*/
virtual void alter(SharedPtr<NamesAndTypes> columns)
virtual void alter(NamesAndTypesPtr columns)
{
throw Exception("Method alter() is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -18,7 +18,7 @@ public:
const String & name,
const String & data_path,
const String & table_name,
SharedPtr<NamesAndTypes> columns) const;
NamesAndTypesPtr columns) const;
};
typedef SharedPtr<StorageFactory> StorageFactoryPtr;

View File

@ -1,9 +1,5 @@
#ifndef DBMS_STORAGES_STORAGE_LOG_H
#define DBMS_STORAGES_STORAGE_LOG_H
#pragma once
#include <map>
#include <Poco/SharedPtr.h>
#include <Poco/File.h>
#include <DB/Core/NamesAndTypes.h>
@ -18,7 +14,6 @@
namespace DB
{
using Poco::SharedPtr;
class StorageLog;
class LogBlockInputStream : public IProfilingBlockInputStream
@ -84,7 +79,7 @@ public:
* (корректность имён и путей не проверяется)
* состоящую из указанных столбцов; создать файлы, если их нет.
*/
StorageLog(const std::string & path_, const std::string & name_, SharedPtr<NamesAndTypes> columns_,
StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesPtr columns_,
const std::string & extension_ = ".bin");
std::string getName() const { return "Log"; }
@ -103,7 +98,7 @@ public:
private:
const std::string path;
const std::string name;
SharedPtr<NamesAndTypes> columns;
NamesAndTypesPtr columns;
const std::string extension;
typedef std::map<std::string, Poco::File> Files_t;
@ -111,5 +106,3 @@ private:
};
}
#endif

View File

@ -0,0 +1,73 @@
#pragma once
#include <DB/Core/NamesAndTypes.h>
#include <DB/Storages/IStorage.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
class StorageMemory;
typedef std::vector<Block> Blocks;
class MemoryBlockInputStream : public IProfilingBlockInputStream
{
public:
MemoryBlockInputStream(const Names & column_names_, StorageMemory & storage_);
Block readImpl();
String getName() const { return "MemoryBlockInputStream"; }
BlockInputStreamPtr clone() { return new MemoryBlockInputStream(column_names, storage); }
private:
Names column_names;
StorageMemory & storage;
Blocks::iterator it;
};
class MemoryBlockOutputStream : public IBlockOutputStream
{
public:
MemoryBlockOutputStream(StorageMemory & storage_);
void write(const Block & block);
BlockOutputStreamPtr clone() { return new MemoryBlockOutputStream(storage); }
private:
StorageMemory & storage;
};
/** Реализует хранилище в оперативке.
* Подходит для временных данных.
* В нём не поддерживаются ключи.
* Данные хранятся в виде набора блоков и никуда дополнительно не сохраняются.
*/
class StorageMemory : public IStorage
{
friend class MemoryBlockInputStream;
friend class MemoryBlockOutputStream;
public:
StorageMemory(const std::string & name_, NamesAndTypesPtr columns_);
std::string getName() const { return "Memory"; }
std::string getTableName() const { return name; }
const NamesAndTypes & getColumns() const { return *columns; }
BlockInputStreamPtr read(
const Names & column_names,
ASTPtr query,
size_t max_block_size = DEFAULT_BLOCK_SIZE);
BlockOutputStreamPtr write(
ASTPtr query);
private:
const std::string name;
NamesAndTypesPtr columns;
/// Сами данные
Blocks data;
};
}

View File

@ -22,7 +22,7 @@ StoragePtr InterpreterCreateQuery::execute(ASTPtr query, Context & context)
String as_database_name = create.as_database.empty() ? context.current_database : create.as_database;
String as_table_name = create.as_table;
SharedPtr<NamesAndTypes> columns = new NamesAndTypes;
NamesAndTypesPtr columns = new NamesAndTypes;
String data_path = context.path + "data/" + database_name + "/"; /// TODO: эскейпинг
String metadata_path = context.path + "metadata/" + database_name + "/" + table_name + ".sql";
@ -84,7 +84,7 @@ StoragePtr InterpreterCreateQuery::execute(ASTPtr query, Context & context)
else
{
Poco::FileOutputStream metadata_file(metadata_path);
metadata_file << "ATTACH TABLE " << database_name << "." << table_name << "\n"
metadata_file << "ATTACH TABLE " << table_name << "\n"
<< "(\n";
for (NamesAndTypes::const_iterator it = columns->begin(); it != columns->end(); ++it)

View File

@ -1,4 +1,5 @@
#include <DB/Storages/StorageLog.h>
#include <DB/Storages/StorageMemory.h>
#include <DB/Storages/StorageSystemNumbers.h>
#include <DB/Storages/StorageSystemOne.h>
#include <DB/Storages/StorageFactory.h>
@ -12,10 +13,12 @@ StoragePtr StorageFactory::get(
const String & name,
const String & data_path,
const String & table_name,
SharedPtr<NamesAndTypes> columns) const
NamesAndTypesPtr columns) const
{
if (name == "Log")
return new StorageLog(data_path, table_name, columns);
else if (name == "Memory")
return new StorageMemory(table_name, columns);
else if (name == "SystemNumbers")
{
if (columns->size() != 1 || columns->begin()->first != "number" || columns->begin()->second->getName() != "UInt64")

View File

@ -60,7 +60,7 @@ void LogBlockOutputStream::write(const Block & block)
}
StorageLog::StorageLog(const std::string & path_, const std::string & name_, SharedPtr<NamesAndTypes> columns_,
StorageLog::StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesPtr columns_,
const std::string & extension_)
: path(path_), name(name_), columns(columns_), extension(extension_)
{

View File

@ -0,0 +1,65 @@
#include <map>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Storages/StorageMemory.h>
namespace DB
{
using Poco::SharedPtr;
MemoryBlockInputStream::MemoryBlockInputStream(const Names & column_names_, StorageMemory & storage_)
: column_names(column_names_), storage(storage_), it(storage.data.begin())
{
}
Block MemoryBlockInputStream::readImpl()
{
if (it == storage.data.end())
return Block();
else
return *it++;
}
MemoryBlockOutputStream::MemoryBlockOutputStream(StorageMemory & storage_)
: storage(storage_)
{
}
void MemoryBlockOutputStream::write(const Block & block)
{
storage.check(block);
storage.data.push_back(block);
}
StorageMemory::StorageMemory(const std::string & name_, NamesAndTypesPtr columns_)
: name(name_), columns(columns_)
{
}
BlockInputStreamPtr StorageMemory::read(
const Names & column_names,
ASTPtr query,
size_t max_block_size)
{
check(column_names);
return new MemoryBlockInputStream(column_names, *this);
}
BlockOutputStreamPtr StorageMemory::write(
ASTPtr query)
{
return new MemoryBlockOutputStream(*this);
}
}