src/StreamletAttributeFinder.h
branchv_0
changeset 35 926eb93c302f
parent 32 bccda5688d71
child 45 f466b4c7d9b1
--- a/src/StreamletAttributeFinder.h	Sat Jan 11 22:11:01 2020 +0100
+++ b/src/StreamletAttributeFinder.h	Sat Jan 11 22:40:52 2020 +0100
@@ -50,13 +50,19 @@
 
 protected:
 
-	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override {
+	void startFile(const fs::path& file, const string& fileRaw, bool exists) override {
+		AttributeFinder::startFile(file, fileRaw, exists);
+		if (exists) {
+			for (auto subProcess : subProcesses) {
+				subProcess.second->write({StreamletMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull
+				subProcess.second->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES});
+			}
+		}
+	}
+
+	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override {
 		// TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock)
 		if (field.group == RequestedField::GROUP_STREAMLET) {
-
-			subProcesses[field.id]->write({StreamletMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull
-			subProcesses[field.id]->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES});
-
 			for (auto metadata : cachedMetadata[field.id]) {
 				SubProcess::Message m = subProcesses[field.id]->read();
 				if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]);