57 Configuration configuration; |
57 Configuration configuration; |
58 relpipe::common::type::StringX currentRelationName; |
58 relpipe::common::type::StringX currentRelationName; |
59 std::vector<relpipe::reader::handlers::AttributeMetadata> currentReaderMetadata; |
59 std::vector<relpipe::reader::handlers::AttributeMetadata> currentReaderMetadata; |
60 std::vector<relpipe::writer::AttributeMetadata> currentWriterMetadata; |
60 std::vector<relpipe::writer::AttributeMetadata> currentWriterMetadata; |
61 HeaderDefinition requestHeader; |
61 HeaderDefinition requestHeader; |
|
62 HTTPClient::Request request; |
62 std::vector<HeaderDefinition> requestHeaders; |
63 std::vector<HeaderDefinition> requestHeaders; |
63 std::vector<relpipe::common::type::StringX> responseHeaders; |
64 std::vector<relpipe::common::type::StringX> responseHeaders; |
64 size_t currentAttributeIndex = 0; |
65 size_t currentAttributeIndex = 0; |
65 size_t currentRecordNumber = 1; |
66 size_t currentRecordNumber = 1; |
66 |
67 |
77 |
78 |
78 responseHeaders.clear(); |
79 responseHeaders.clear(); |
79 } |
80 } |
80 } |
81 } |
81 |
82 |
|
83 HTTPClient::Method parseMethod(const relpipe::common::type::StringX& value) { |
|
84 if (value.size() == 0) return HTTPClient::Method::GET; |
|
85 else if (value == L"GET") return HTTPClient::Method::GET; |
|
86 else if (value == L"HEAD") return HTTPClient::Method::HEAD; |
|
87 else if (value == L"POST") return HTTPClient::Method::POST; |
|
88 else if (value == L"PUT") return HTTPClient::Method::PUT; |
|
89 else if (value == L"DELETE") return HTTPClient::Method::DELETE; |
|
90 else if (value == L"PATCH") return HTTPClient::Method::PATCH; |
|
91 else throw std::invalid_argument("Unsupported HTTP method: " + convertor.to_bytes(value)); |
|
92 } |
|
93 |
82 public: |
94 public: |
83 |
95 |
84 HTTPHandler(shared_ptr<relpipe::writer::RelationalWriter> relationalWriter, Configuration configuration) : relationalWriter(relationalWriter), configuration(configuration) { |
96 HTTPHandler(shared_ptr<relpipe::writer::RelationalWriter> relationalWriter, Configuration configuration) : relationalWriter(relationalWriter), configuration(configuration) { |
85 } |
97 } |
86 |
98 |
90 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
102 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
91 writeHeaders(); // from previous relation |
103 writeHeaders(); // from previous relation |
92 |
104 |
93 currentRelationName = name; |
105 currentRelationName = name; |
94 currentReaderMetadata = attributes; |
106 currentReaderMetadata = attributes; |
|
107 currentAttributeIndex = 0; |
95 |
108 |
96 if (currentRelationName == L"header") { |
109 if (currentRelationName == L"header") { |
97 // TODO: analyze header attributes |
110 // TODO: analyze header attributes |
98 } else if (currentRelationName == L"request") { |
111 } else if (currentRelationName == L"request") { |
99 relationalWriter->startRelation(L"response",{ |
112 relationalWriter->startRelation(L"response",{ |
128 } |
141 } |
129 |
142 |
130 } |
143 } |
131 |
144 |
132 void requestAttribute(const relpipe::common::type::StringX& value) { |
145 void requestAttribute(const relpipe::common::type::StringX& value) { |
133 std::shared_ptr<HTTPClient> http(HTTPClient::open()); |
146 if (currentReaderMetadata[currentAttributeIndex].getAttributeName() == L"url") request.url = convertor.to_bytes(value); |
|
147 else if (currentReaderMetadata[currentAttributeIndex].getAttributeName() == L"method") request.method = parseMethod(value); |
|
148 else throw std::invalid_argument("Unsupported attribute in the header relation: " + convertor.to_bytes(currentReaderMetadata[currentAttributeIndex].getAttributeName() + L" = " + value)); |
134 |
149 |
135 HTTPClient::Request request; |
150 currentAttributeIndex++; |
136 request.method = HTTPClient::Method::GET; |
|
137 request.url = convertor.to_bytes(value); |
|
138 |
151 |
139 // TODO: set request headers from the "header" relation |
152 if (currentAttributeIndex % currentReaderMetadata.size() == 0) { |
140 // request.headers.push_back("Authorization"); |
153 currentAttributeIndex = 0; |
141 // request.headers.push_back("Basic YWhvajpoZXNsbw=="); |
154 std::shared_ptr<HTTPClient> http(HTTPClient::open()); |
142 // request.headers.push_back("Accept"); |
155 |
143 // request.headers.push_back("application/xml"); |
156 for (const HeaderDefinition& h : requestHeaders) { |
144 // request.headers.push_back("User-Agent"); |
157 request.headers.push_back(convertor.to_bytes(h.name)); |
145 // request.headers.push_back("curl/7.58.0"); |
158 request.headers.push_back(convertor.to_bytes(h.value)); |
146 for (const HeaderDefinition& h : requestHeaders) { |
159 } |
147 request.headers.push_back(convertor.to_bytes(h.name)); |
160 |
148 request.headers.push_back(convertor.to_bytes(h.value)); |
161 std::string body; |
|
162 relpipe::common::type::Integer responseCode = -1; |
|
163 |
|
164 try { |
|
165 HTTPClient::Response response = http->exchange(request); |
|
166 responseCode = response.responseCode; |
|
167 body = response.body; |
|
168 |
|
169 for (size_t i = 0; i < response.headers.size(); i += 2) { |
|
170 responseHeaders.push_back(convertor.from_bytes(request.url)); |
|
171 responseHeaders.push_back(convertor.from_bytes(response.headers[i])); |
|
172 responseHeaders.push_back(convertor.from_bytes(response.headers[i + 1])); |
|
173 } |
|
174 } catch (const HTTPClient::Exception& e) { |
|
175 body = e.getFullMessage(); |
|
176 // TODO: move error message into separate attribute? |
|
177 } |
|
178 |
|
179 relationalWriter->writeAttribute(convertor.from_bytes(request.url)); |
|
180 relationalWriter->writeAttribute(convertor.from_bytes(body)); |
|
181 relationalWriter->writeAttribute(&responseCode, typeid (responseCode)); |
|
182 |
|
183 request = HTTPClient::Request(); |
149 } |
184 } |
150 |
|
151 std::string body; |
|
152 relpipe::common::type::Integer responseCode = -1; |
|
153 |
|
154 try { |
|
155 HTTPClient::Response response = http->exchange(request); |
|
156 responseCode = response.responseCode; |
|
157 body = response.body; |
|
158 |
|
159 for (size_t i = 0; i < response.headers.size(); i += 2) { |
|
160 responseHeaders.push_back(convertor.from_bytes(request.url)); |
|
161 responseHeaders.push_back(convertor.from_bytes(response.headers[i])); |
|
162 responseHeaders.push_back(convertor.from_bytes(response.headers[i + 1])); |
|
163 } |
|
164 } catch (const HTTPClient::Exception& e) { |
|
165 body = e.getFullMessage(); |
|
166 // TODO: move error message into separate attribute? |
|
167 } |
|
168 |
|
169 relationalWriter->writeAttribute(value); |
|
170 relationalWriter->writeAttribute(convertor.from_bytes(body)); |
|
171 relationalWriter->writeAttribute(&responseCode, typeid (responseCode)); |
|
172 } |
185 } |
173 |
186 |
174 public: |
187 public: |
175 |
188 |
176 void endOfPipe() { |
189 void endOfPipe() { |