propagate the relation name to the finders and streamlets v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Sat, 11 Jan 2020 19:10:01 +0100
branchv_0
changeset 32 bccda5688d71
parent 31 c64e1588f428
child 33 f9cada1d46a4
propagate the relation name to the finders and streamlets
src/AttributeFinder.h
src/FileAttributeFinder.h
src/FilesystemCommand.h
src/StreamletAttributeFinder.h
src/XattrAttributeFinder.h
--- a/src/AttributeFinder.h	Sat Jan 11 18:13:30 2020 +0100
+++ b/src/AttributeFinder.h	Sat Jan 11 19:10:01 2020 +0100
@@ -43,7 +43,7 @@
 	 * @param writer
 	 * @param field
 	 */
-	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const RequestedField& field) = 0;
+	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) = 0;
 
 	/**
 	 * Writes empty attribute(s) in case of non-existent file or an error. 
@@ -51,9 +51,9 @@
 	 * @param writer
 	 * @param field
 	 */
-	virtual void writeEmptyField(RelationalWriter* writer, const RequestedField& field) {
+	virtual void writeEmptyField(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) {
 		// TODO: better handling of null values (when null values are supported by the format specification)
-		for (AttributeMetadata m : toMetadata(writer, field)) {
+		for (AttributeMetadata m : toMetadata(writer, relationName, field)) {
 			switch (m.typeId) {
 				case TypeId::BOOLEAN:
 					writer->writeAttribute(L"false");
@@ -76,10 +76,11 @@
 	 * Single requested fields might generate multiple attributes in the relation.
 	 * But usually it is 1:1.
 	 * @param writer can be used for TypeId coversion from string_t
+	 * @param relationName default one or set by the user
 	 * @param field requested field from the user (usually from CLI arguments)
 	 * @return attribute metadata to be used in the RelationalWriter.startRelation()
 	 */
-	virtual vector<AttributeMetadata> toMetadata(RelationalWriter* writer, const RequestedField& field) = 0;
+	virtual vector<AttributeMetadata> toMetadata(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) = 0;
 
 	/**
 	 * Writing of the record for current file is starting.
@@ -93,9 +94,9 @@
 		currentFileExists = exists;
 	}
 
-	virtual void writeField(RelationalWriter* writer, const RequestedField& field) {
-		if (currentFileExists) writeFieldOfExistingFile(writer, field);
-		else writeEmptyField(writer, field);
+	virtual void writeField(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) {
+		if (currentFileExists) writeFieldOfExistingFile(writer, relationName, field);
+		else writeEmptyField(writer, relationName, field);
 	}
 
 	/**
--- a/src/FileAttributeFinder.h	Sat Jan 11 18:13:30 2020 +0100
+++ b/src/FileAttributeFinder.h	Sat Jan 11 19:10:01 2020 +0100
@@ -88,7 +88,7 @@
 
 protected:
 
-	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const RequestedField& field) override {
+	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override {
 		if (field.group == RequestedField::GROUP_FILE) {
 			for (string_t alias : field.getAliases()) {
 				if (field.name == FIELD_NAME) {
@@ -145,7 +145,7 @@
 	static const string_t FIELD_GROUP;
 	static const string_t FIELD_CONTENT;
 
-	virtual vector<AttributeMetadata> toMetadata(RelationalWriter* writer, const RequestedField& field) override {
+	virtual vector<AttributeMetadata> toMetadata(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override {
 		if (field.group == RequestedField::GROUP_FILE) {
 			vector<AttributeMetadata> metadata;
 			for (string_t alias : field.getAliases()) {
@@ -158,13 +158,13 @@
 		}
 	}
 
-	void writeField(RelationalWriter* writer, const RequestedField& field) override {
+	void writeField(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override {
 		if (currentFileExists
 				|| field.name == FIELD_PATH_ORIGINAL
 				|| (fs::is_symlink(currentFile) && (field.name == FIELD_SYMLINK_TARGET || field.name == FIELD_TYPE)))
-			writeFieldOfExistingFile(writer, field);
+			writeFieldOfExistingFile(writer, relationName, field);
 		else
-			writeEmptyField(writer, field);
+			writeEmptyField(writer, relationName, field);
 	}
 
 	void endFile() override {
--- a/src/FilesystemCommand.h	Sat Jan 11 18:13:30 2020 +0100
+++ b/src/FilesystemCommand.h	Sat Jan 11 19:10:01 2020 +0100
@@ -77,14 +77,15 @@
 	void process(std::istream& input, std::ostream& output, Configuration& configuration) {
 		std::shared_ptr<RelationalWriter> writer(Factory::create(output));
 
+		string_t relationName = configuration.relation.empty() ? L"filesystem" : configuration.relation;
+
 		std::vector<AttributeMetadata> attributesMetadata;
 		for (RequestedField field : configuration.fields) {
 			AttributeFinder* finder = attributeFinders[field.group];
-			if (finder) for (AttributeMetadata m : finder->toMetadata(writer.get(), field)) attributesMetadata.push_back(m);
+			if (finder) for (AttributeMetadata m : finder->toMetadata(writer.get(), relationName, field)) attributesMetadata.push_back(m);
 			else throw RelpipeWriterException(L"Unsupported field group: " + field.group);
 		}
 
-		string_t relationName = configuration.relation.empty() ? L"filesystem" : configuration.relation;
 		writer->startRelation(relationName, attributesMetadata, true);
 
 
@@ -102,7 +103,7 @@
 
 			for (RequestedField field : configuration.fields) {
 				AttributeFinder* finder = attributeFinders[field.group]; // should not be nullptr, because already checked while writing the relation metadata
-				finder->writeField(writer.get(), field);
+				finder->writeField(writer.get(), relationName, field);
 			}
 
 			for (auto& finder : attributeFinders) finder.second->endFile();
--- a/src/StreamletAttributeFinder.h	Sat Jan 11 18:13:30 2020 +0100
+++ b/src/StreamletAttributeFinder.h	Sat Jan 11 19:10:01 2020 +0100
@@ -50,7 +50,7 @@
 
 protected:
 
-	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const RequestedField& field) override {
+	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override {
 		// TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock)
 		if (field.group == RequestedField::GROUP_STREAMLET) {
 
@@ -73,7 +73,7 @@
 
 	static const string_t SCRIPT_PREFIX;
 
-	virtual vector<AttributeMetadata> toMetadata(RelationalWriter* writer, const RequestedField& field) override {
+	virtual vector<AttributeMetadata> toMetadata(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override {
 		if (field.group == RequestedField::GROUP_STREAMLET) {
 
 			if (cachedMetadata.count(field.id)) {
@@ -96,7 +96,7 @@
 				subProcess->write({StreamletMsg::WAITING_FOR_VERSION});
 				SubProcess::Message versionMessage = subProcess->read();
 				if (versionMessage.code == StreamletMsg::VERSION_ACCEPTED && versionMessage.parameters[0] == version) {
-					subProcess->write({StreamletMsg::RELATION_START});
+					subProcess->write({StreamletMsg::RELATION_START, relationName});
 					subProcess->write({StreamletMsg::INPUT_ATTRIBUTE_METADATA, L"path", L"string"});
 					for (string_t alias : field.getAliases()) subProcess->write({StreamletMsg::OUTPUT_ATTRIBUTE_ALIAS, alias});
 					for (int i = 0; i < field.options.size();) subProcess->write({StreamletMsg::OPTION, field.options[i++], field.options[i++]});
--- a/src/XattrAttributeFinder.h	Sat Jan 11 18:13:30 2020 +0100
+++ b/src/XattrAttributeFinder.h	Sat Jan 11 19:10:01 2020 +0100
@@ -52,7 +52,7 @@
 	}
 protected:
 
-	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const RequestedField& field) override {
+	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override {
 		for (string_t alias : field.getAliases()) {
 			// TODO: support also other namespaces through CLI --option namespace someOtherNS
 			string_t xattrName = L"user." + field.name;
@@ -62,7 +62,7 @@
 
 public:
 
-	virtual vector<AttributeMetadata> toMetadata(RelationalWriter* writer, const RequestedField& field) override {
+	virtual vector<AttributeMetadata> toMetadata(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override {
 		if (field.group == RequestedField::GROUP_XATTR) {
 			vector<AttributeMetadata> metadata;
 			for (string_t alias : field.getAliases()) metadata.push_back(AttributeMetadata{alias, TypeId::STRING});