src/SqlHandler.h
branchv_0
changeset 6 32b4293307f4
parent 5 cbc7817a3346
child 7 9119b29d1e7c
equal deleted inserted replaced
5:cbc7817a3346 6:32b4293307f4
    17  */
    17  */
    18 #pragma once
    18 #pragma once
    19 
    19 
    20 #include <memory>
    20 #include <memory>
    21 #include <string>
    21 #include <string>
       
    22 #include <sstream>
    22 #include <vector>
    23 #include <vector>
    23 #include <locale>
    24 #include <locale>
    24 #include <codecvt>
    25 #include <codecvt>
    25 #include <unistd.h>
    26 #include <unistd.h>
    26 
    27 
   125 private:
   126 private:
   126 	Configuration configuration;
   127 	Configuration configuration;
   127 	writer::RelationalWriter* relationalWriter;
   128 	writer::RelationalWriter* relationalWriter;
   128 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings
   129 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings
   129 	std::unique_ptr<Connection> connection;
   130 	std::unique_ptr<Connection> connection;
       
   131 	relpipe::writer::string_t currentInsert;
   130 
   132 
   131 	void processStatement(const Statement& statement) {
   133 	void processStatement(const Statement& statement) {
   132 		PreparedStatement prepared = connection->prepareStatement(convertor.to_bytes(statement.sql).c_str());
   134 		PreparedStatement prepared = connection->prepareStatement(convertor.to_bytes(statement.sql).c_str());
   133 		int columnCount = prepared.getColumnCount();
   135 		int columnCount = prepared.getColumnCount();
   134 		int parameterCount = statement.parameters.size();
   136 		int parameterCount = statement.parameters.size();
   147 				relationalWriter->writeAttribute(convertor.from_bytes(prepared.getString(i)));
   149 				relationalWriter->writeAttribute(convertor.from_bytes(prepared.getString(i)));
   148 			}
   150 			}
   149 		}
   151 		}
   150 	}
   152 	}
   151 
   153 
       
   154 	relpipe::writer::string_t toSQLType(relpipe::reader::TypeId typeId) {
       
   155 		if (typeId == relpipe::reader::TypeId::BOOLEAN) return L"boolean";
       
   156 		else if (typeId == relpipe::reader::TypeId::INTEGER) return L"integer";
       
   157 		else return L"text";
       
   158 	}
       
   159 
       
   160 	void writeIdentifier(std::wstringstream& output, relpipe::writer::string_t identifier) {
       
   161 		for (auto & ch : identifier) {
       
   162 			if (ch == L'"') output << L"\"\"";
       
   163 			else output << ch;
       
   164 		}
       
   165 	}
       
   166 
   152 public:
   167 public:
   153 
   168 
   154 	SqlHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) {
   169 	SqlHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) {
   155 		std::string file = configuration.file.size() ? convertor.to_bytes(configuration.file) : ":memory:";
   170 		std::string file = configuration.file.size() ? convertor.to_bytes(configuration.file) : ":memory:";
   156 		connection.reset(new Connection(file.c_str()));
   171 		connection.reset(new Connection(file.c_str()));
   158 
   173 
   159 	virtual ~SqlHandler() {
   174 	virtual ~SqlHandler() {
   160 	}
   175 	}
   161 
   176 
   162 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
   177 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
   163 
   178 		std::wstringstream sql;
       
   179 		sql << L"CREATE TABLE ";
       
   180 		writeIdentifier(sql, name);
       
   181 		sql << L" (\n";
       
   182 		for (int i = 0; i < attributes.size(); i++) {
       
   183 			sql << L"\t";
       
   184 			writeIdentifier(sql, attributes[i].getAttributeName());
       
   185 			sql << L" " << toSQLType(attributes[i].getTypeId());
       
   186 			if (i < attributes.size() - 1) sql << L",\n";
       
   187 		}
       
   188 		sql << L"\n)";
       
   189 
       
   190 		PreparedStatement createTable = connection->prepareStatement(convertor.to_bytes(sql.str()).c_str());
       
   191 		createTable.next();
   164 	}
   192 	}
   165 
   193 
   166 	void attribute(const string_t& value) override {
   194 	void attribute(const string_t& value) override {
   167 
   195 
   168 	}
   196 	}
   169 
   197 
   170 	void endOfPipe() {
   198 	void endOfPipe() {
   171 		for (const Statement& statement : configuration.statements) processStatement(statement);
   199 		for (const Statement& statement : configuration.statements) processStatement(statement);
   172 		
   200 
   173 		if (configuration.file.size() && !configuration.keepFile) {
   201 		if (configuration.file.size() && !configuration.keepFile) {
   174 			int result = unlink(convertor.to_bytes(configuration.file).c_str());
   202 			int result = unlink(convertor.to_bytes(configuration.file).c_str());
   175 			if (result) throw SqlException(L"Unable to delete SQLite file.");
   203 			if (result) throw SqlException(L"Unable to delete SQLite file.");
   176 		}
   204 		}
   177 	}
   205 	}