streamlets: enable parallel processing v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Sat, 11 Jan 2020 22:40:52 +0100
branchv_0
changeset 35 926eb93c302f
parent 34 0b9e4af08cc8
child 36 8b97ab3cd9cc
streamlets: enable parallel processing
src/StreamletAttributeFinder.h
--- a/src/StreamletAttributeFinder.h	Sat Jan 11 22:11:01 2020 +0100
+++ b/src/StreamletAttributeFinder.h	Sat Jan 11 22:40:52 2020 +0100
@@ -50,13 +50,19 @@
 
 protected:
 
-	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override {
+	void startFile(const fs::path& file, const string& fileRaw, bool exists) override {
+		AttributeFinder::startFile(file, fileRaw, exists);
+		if (exists) {
+			for (auto subProcess : subProcesses) {
+				subProcess.second->write({StreamletMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull
+				subProcess.second->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES});
+			}
+		}
+	}
+
+	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override {
 		// TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock)
 		if (field.group == RequestedField::GROUP_STREAMLET) {
-
-			subProcesses[field.id]->write({StreamletMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull
-			subProcesses[field.id]->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES});
-
 			for (auto metadata : cachedMetadata[field.id]) {
 				SubProcess::Message m = subProcesses[field.id]->read();
 				if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]);