--- a/src/StreamletAttributeFinder.h Sat Jan 11 22:11:01 2020 +0100
+++ b/src/StreamletAttributeFinder.h Sat Jan 11 22:40:52 2020 +0100
@@ -50,13 +50,19 @@
protected:
- virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override {
+ void startFile(const fs::path& file, const string& fileRaw, bool exists) override {
+ AttributeFinder::startFile(file, fileRaw, exists);
+ if (exists) {
+ for (auto subProcess : subProcesses) {
+ subProcess.second->write({StreamletMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull
+ subProcess.second->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES});
+ }
+ }
+ }
+
+ virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override {
// TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock)
if (field.group == RequestedField::GROUP_STREAMLET) {
-
- subProcesses[field.id]->write({StreamletMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull
- subProcesses[field.id]->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES});
-
for (auto metadata : cachedMetadata[field.id]) {
SubProcess::Message m = subProcesses[field.id]->read();
if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]);