48 return SCRIPT_PREFIX + field.name; |
48 return SCRIPT_PREFIX + field.name; |
49 } |
49 } |
50 |
50 |
51 protected: |
51 protected: |
52 |
52 |
53 virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override { |
53 void startFile(const fs::path& file, const string& fileRaw, bool exists) override { |
|
54 AttributeFinder::startFile(file, fileRaw, exists); |
|
55 if (exists) { |
|
56 for (auto subProcess : subProcesses) { |
|
57 subProcess.second->write({StreamletMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull |
|
58 subProcess.second->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES}); |
|
59 } |
|
60 } |
|
61 } |
|
62 |
|
63 virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override { |
54 // TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock) |
64 // TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock) |
55 if (field.group == RequestedField::GROUP_STREAMLET) { |
65 if (field.group == RequestedField::GROUP_STREAMLET) { |
56 |
|
57 subProcesses[field.id]->write({StreamletMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull |
|
58 subProcesses[field.id]->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES}); |
|
59 |
|
60 for (auto metadata : cachedMetadata[field.id]) { |
66 for (auto metadata : cachedMetadata[field.id]) { |
61 SubProcess::Message m = subProcesses[field.id]->read(); |
67 SubProcess::Message m = subProcesses[field.id]->read(); |
62 if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]); |
68 if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]); |
63 else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString()); |
69 else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString()); |
64 } |
70 } |