22 #include <sstream> |
22 #include <sstream> |
23 #include <vector> |
23 #include <vector> |
24 #include <locale> |
24 #include <locale> |
25 #include <codecvt> |
25 #include <codecvt> |
26 #include <unistd.h> |
26 #include <unistd.h> |
|
27 #include <cassert> |
27 |
28 |
28 #include <sqlite3.h> |
29 #include <sqlite3.h> |
29 |
30 |
30 #include <relpipe/reader/typedefs.h> |
31 #include <relpipe/reader/typedefs.h> |
31 #include <relpipe/reader/TypeId.h> |
32 #include <relpipe/reader/TypeId.h> |
32 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h> |
33 #include <relpipe/reader/handlers/RelationalReaderValueHandler.h> |
33 #include <relpipe/reader/handlers/AttributeMetadata.h> |
34 #include <relpipe/reader/handlers/AttributeMetadata.h> |
34 |
35 |
35 #include <relpipe/writer/Factory.h> |
36 #include <relpipe/writer/Factory.h> |
36 |
37 |
37 #include "Configuration.h" |
38 #include "Configuration.h" |
55 PreparedStatement(sqlite3_stmt* stmt) : stmt(stmt) { |
56 PreparedStatement(sqlite3_stmt* stmt) : stmt(stmt) { |
56 } |
57 } |
57 |
58 |
58 virtual ~PreparedStatement() { |
59 virtual ~PreparedStatement() { |
59 sqlite3_finalize(stmt); |
60 sqlite3_finalize(stmt); |
|
61 } |
|
62 |
|
63 void setBoolean(int parameterIndex, relpipe::reader::boolean_t value) { |
|
64 int result = sqlite3_bind_int(stmt, parameterIndex, value); |
|
65 if (result != SQLITE_OK) throw SqlException(L"Unable to set SQLite parameter."); |
|
66 } |
|
67 |
|
68 void setInteger(int parameterIndex, relpipe::reader::integer_t value) { |
|
69 int result = sqlite3_bind_int64(stmt, parameterIndex, value); |
|
70 if (result != SQLITE_OK) throw SqlException(L"Unable to set SQLite parameter."); |
60 } |
71 } |
61 |
72 |
62 void setString(int parameterIndex, std::string value) { |
73 void setString(int parameterIndex, std::string value) { |
63 int result = sqlite3_bind_text(stmt, parameterIndex, value.c_str(), -1, SQLITE_TRANSIENT); |
74 int result = sqlite3_bind_text(stmt, parameterIndex, value.c_str(), -1, SQLITE_TRANSIENT); |
64 if (result != SQLITE_OK) throw SqlException(L"Unable to set SQLite parameter."); |
75 if (result != SQLITE_OK) throw SqlException(L"Unable to set SQLite parameter."); |
125 else throw SqlException(L"Unable to prepare SQLite statement."); |
136 else throw SqlException(L"Unable to prepare SQLite statement."); |
126 } |
137 } |
127 |
138 |
128 }; |
139 }; |
129 |
140 |
130 class SqlHandler : public RelationalReaderStringHandler { |
141 class SqlHandler : public RelationalReaderValueHandler { |
131 private: |
142 private: |
132 Configuration configuration; |
143 Configuration configuration; |
133 writer::RelationalWriter* relationalWriter; |
144 writer::RelationalWriter* relationalWriter; |
134 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings |
145 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings |
135 vector<AttributeMetadata> currentReaderMetadata; |
146 vector<AttributeMetadata> currentReaderMetadata; |
157 } |
168 } |
158 } |
169 } |
159 } |
170 } |
160 |
171 |
161 relpipe::writer::string_t toSQLType(relpipe::reader::TypeId typeId) { |
172 relpipe::writer::string_t toSQLType(relpipe::reader::TypeId typeId) { |
162 if (typeId == relpipe::reader::TypeId::BOOLEAN) return L"boolean"; |
173 if (typeId == relpipe::reader::TypeId::BOOLEAN) return L"integer"; // TODO: map selected values back to booleans or allow optional storage as string |
163 else if (typeId == relpipe::reader::TypeId::INTEGER) return L"integer"; |
174 else if (typeId == relpipe::reader::TypeId::INTEGER) return L"integer"; |
164 else return L"text"; |
175 else return L"text"; |
165 } |
176 } |
166 |
177 |
167 void writeIdentifier(std::wstringstream& output, relpipe::writer::string_t identifier) { |
178 void writeIdentifier(std::wstringstream& output, relpipe::writer::string_t identifier) { |
212 } |
223 } |
213 sql << L")"; |
224 sql << L")"; |
214 currentInsert.reset(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str())); |
225 currentInsert.reset(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str())); |
215 } |
226 } |
216 |
227 |
217 void attribute(const string_t& value) override { |
228 void attribute(const void* value, const std::type_info& typeInfo) override { |
|
229 relpipe::reader::TypeId type = currentReaderMetadata[currentAttributeIndex].getTypeId(); |
218 currentAttributeIndex++; |
230 currentAttributeIndex++; |
219 if (currentAttributeIndex % currentReaderMetadata.size()) { |
231 |
220 currentInsert->setString(currentAttributeIndex, convertor.to_bytes(value).c_str()); |
232 switch (type) { |
221 } else { |
233 case relpipe::reader::TypeId::BOOLEAN: |
|
234 { |
|
235 assert(typeInfo == typeid (boolean_t)); |
|
236 auto* typedValue = static_cast<const boolean_t*> (value); |
|
237 currentInsert->setBoolean(currentAttributeIndex, *typedValue); |
|
238 break; |
|
239 } |
|
240 case relpipe::reader::TypeId::INTEGER: |
|
241 { |
|
242 assert(typeInfo == typeid (integer_t)); |
|
243 auto* typedValue = static_cast<const integer_t*> (value); |
|
244 currentInsert->setInteger(currentAttributeIndex, *typedValue); |
|
245 break; |
|
246 } |
|
247 case relpipe::reader::TypeId::STRING: |
|
248 { |
|
249 assert(typeInfo == typeid (string_t)); |
|
250 auto* typedValue = static_cast<const string_t*> (value); |
|
251 currentInsert->setString(currentAttributeIndex, convertor.to_bytes(*typedValue).c_str()); |
|
252 break; |
|
253 } |
|
254 default: |
|
255 throw SqlException(L"Unsupported type in attribute()"); |
|
256 } |
|
257 |
|
258 if (currentAttributeIndex % currentReaderMetadata.size() == 0) { |
222 currentInsert->next(); |
259 currentInsert->next(); |
223 currentInsert->reset(); |
260 currentInsert->reset(); |
224 currentAttributeIndex = 0; |
261 currentAttributeIndex = 0; |
225 } |
262 } |
226 } |
263 } |