Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions base/src/main/java/org/gorpipe/base/config/PropsHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.gorpipe.base.config;

/**
* Helper class for system properties related operations.
*
*/
public class PropsHelper {
/**
* Parse boolan system property.
* @param name property name
* @param defValue default value
* @return boolean value of the property or default value if property is not set or has invalid value.
*/
public static boolean getBoolean(String name, boolean defValue) {
boolean result = defValue;
try {
result = Boolean.parseBoolean(System.getProperty(name));
} catch (IllegalArgumentException | NullPointerException e) {
}
return result;
}
}
4 changes: 3 additions & 1 deletion gortools/src/main/scala/gorsat/Commands/Write.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ class Write extends CommandInfo("WRITE",
context.getSession.getProjectContext.getFileReader.getCommonRoot, null, null, true);
// Infer the full file name from the link (and defautl locations)
LinkFileUtil.inferDataFileNameFromLinkFile(
context.getSession.getProjectContext.getFileReader.resolveDataSource(linkSourceRef).asInstanceOf[StreamSource], linkMetaInfo.linkFileMeta);
context.getSession.getProjectContext.getFileReader.resolveDataSource(linkSourceRef).asInstanceOf[StreamSource],
linkMetaInfo.linkFileMeta,
context.getSession.getProjectContext.getFileReader);
} else {
fileName
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.util.concurrent.UncheckedExecutionException;

import org.gorpipe.base.config.PropsHelper;
import org.gorpipe.exceptions.GorResourceException;
import org.gorpipe.gor.driver.meta.SourceReference;
import org.gorpipe.gor.driver.providers.stream.StreamUtils;
Expand Down Expand Up @@ -59,11 +60,14 @@ public abstract class LinkFile {

private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(LinkFile.class);

// Approx max size of link file content to read or write. Stopp adding lines if exceeded. Dont load if twice this size.
// Approx max size of link file content to read or write. Stop adding lines if exceeded. Don't load if twice this size.
public static final int LINK_FILE_MAX_SIZE = Integer.parseInt(System.getProperty("gor.driver.link.maxfilesize", "10000"));
private static final boolean USE_LINK_CACHE = Boolean.parseBoolean(System.getProperty("gor.driver.link.cache", "true"));
private static final boolean USE_LINK_CACHE_SESSION = Boolean.parseBoolean(System.getProperty("gor.driver.link.cache.session", "true"));

public static final String LINK_FILE_VALIDATE_LOAD = "gor.driver.link.validate.load";
public static final String LINK_FILE_VALIDATE_SAVE = "gor.driver.link.validate.save";

private static final Cache<StreamSource, String> staticLinkCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES).build();
Expand Down Expand Up @@ -128,6 +132,10 @@ protected LinkFile(StreamSource source, LinkFileMeta meta, String content) {
this.source = source;
this.meta = meta;
this.entries = parseEntries(content);

if (PropsHelper.getBoolean(LINK_FILE_VALIDATE_LOAD, false)) {
validate();
}
}

public LinkFileMeta getMeta() {
Expand Down Expand Up @@ -281,6 +289,10 @@ public void save(long timestamp, FileReader reader) {


private void save(OutputStream os, long timestamp, FileReader reader) {
if (PropsHelper.getBoolean(LINK_FILE_VALIDATE_SAVE, false)) {
validate();
}

meta.setProperty(LinkFileMeta.HEADER_SERIAL_KEY, Integer.toString(Integer.parseInt(meta.getProperty(LinkFileMeta.HEADER_SERIAL_KEY, "0")) + 1));

var currentTimestamp = timestamp > 0 ? timestamp : System.currentTimeMillis();
Expand Down Expand Up @@ -309,6 +321,24 @@ private void save(OutputStream os, long timestamp, FileReader reader) {

protected abstract List<LinkFileEntry> parseEntries(String content);

/**
* Check internal consistency of this link file.
*
* @return list of human-readable violation strings; empty means the file is clean
*/
public abstract List<String> checkIntegrity();

/**
* Validate the link file content.
*/
public void validate() {
var vialations = checkIntegrity();
if (!vialations.isEmpty()) {
throw new GorResourceException("Link file integrity check failed with %d violation(s):\n%s".formatted(vialations.size(), String.join("\n", vialations)),
source.getFullPath());
}
}

// Check if we can garbage collect entries between fromIndex and toIndex (inclusive).

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import gorsat.Commands.CommandParseUtilities;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.gorpipe.exceptions.GorResourceException;
import org.gorpipe.gor.driver.GorDriverConfig;
import org.gorpipe.gor.driver.providers.stream.sources.StreamSource;
import org.gorpipe.gor.model.FileReader;
Expand Down Expand Up @@ -30,6 +32,22 @@ public class LinkFileUtil {
* @return the data file path
*/
public static String inferDataFileNameFromLinkFile(StreamSource linkSource, String linkFileMeta) throws IOException {
return inferDataFileNameFromLinkFile(linkSource, linkFileMeta, null);
}

/**
* Infer the data file name from the link file name.
* When a {@code fileReader} is supplied and the link file is lifecycle-managed, this method also
* verifies that the inferred target path does not already exist — overwriting a managed file
* would silently corrupt the version history stored in the link file.
*
* @param linkSource the link file path with the link extension
* @param linkFileMeta additional link file meta data
* @param fileReader file reader used to check target existence; may be {@code null} to skip the check
* @return the data file path
* @throws GorResourceException if the link is managed and the inferred target already exists
*/
public static String inferDataFileNameFromLinkFile(StreamSource linkSource, String linkFileMeta, FileReader fileReader) throws IOException {
if (linkSource == null || Strings.isNullOrEmpty(linkSource.getFullPath())) {
throw new IllegalArgumentException("Link file path is null or empty. Can not infer data file name.");
}
Expand Down Expand Up @@ -68,12 +86,21 @@ public static String inferDataFileNameFromLinkFile(StreamSource linkSource, Stri

var fileName = PathUtils.getFileName(linkSource.getFullPath());
var extraFolder = PathUtils.removeExtensions(fileName);
var uniqueFileName = PathUtils.injectStringIntoFileName(fileName, Integer.toString(link.getSerial() + 1));
var uniqueIdPart = (link.getSerial() + 1) + "-" + RandomStringUtils.insecure().nextAlphanumeric(3);
var uniqueFileName = PathUtils.injectStringIntoFileName(fileName, uniqueIdPart);
var resolvedPath = PathUtils.resolve(PathUtils.resolve(dataFileParentPath, extraFolder), uniqueFileName);

log.debug("Inferred file name for link file {} is {}", linkSource.getFullPath(), resolvedPath);

log.warn("Inferred file name for link file {} is {}", linkSource.getFullPath(),
PathUtils.resolve(PathUtils.resolve(dataFileParentPath, extraFolder), uniqueFileName));
if (fileReader != null && link.getMeta().getPropertyBool(LinkFileMeta.HEADER_DATA_LIFECYCLE_MANAGED_KEY, false)) {
if (fileReader.exists(resolvedPath)) {
throw new GorResourceException(
"Managed link file target already exists, overwrite would corrupt version history: " + resolvedPath,
resolvedPath);
}
}

return PathUtils.resolve(PathUtils.resolve(dataFileParentPath, extraFolder), uniqueFileName);
return resolvedPath;
}

private static Pattern linkPattern = Pattern.compile(".* -link ([^\\s]*) ?.*", Pattern.CASE_INSENSITIVE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import org.gorpipe.gor.driver.providers.stream.sources.StreamSource;
import org.gorpipe.gor.model.FileReader;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
Expand Down Expand Up @@ -35,6 +35,11 @@ public LinkFile appendEntry(String link, String md5, String info, FileReader rea
return this;
}

@Override
public List<String> checkIntegrity() {
return Collections.emptyList();
}

public static String getDefaultMetaContent() {
return String.format("""
## VERSION = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

import org.gorpipe.gor.driver.providers.stream.sources.StreamSource;
import org.gorpipe.gor.model.FileReader;
import org.gorpipe.util.Strings;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import static org.gorpipe.gor.driver.linkfile.LinkFileMeta.HEADER_DATA_LIFECYCLE_MANAGED_KEY;

/**
* Link file format, version 1.
*/
Expand Down Expand Up @@ -41,7 +46,7 @@ protected List<LinkFileEntry> parseEntries(String content) {
public LinkFile appendEntry(String link, String md5, String info, FileReader reader) {
var latestEntry = getLatestEntry();
var entry = new LinkFileEntryV1(link, System.currentTimeMillis(), md5, latestEntry != null ? latestEntry.serial() + 1 : 1, info);
validateEntry(entry, reader);
validateEntry(entry, latestEntry, reader);
entry = handleRepeatedEntries(entry, reader);
if (entry != null) {
entries.add(entry);
Expand All @@ -53,15 +58,25 @@ public LinkFile appendEntry(String link, String md5, String info, FileReader rea
* Validate the entry to ensure it is of the correct type, format and does not violate integrity of the link file.
* @param entry the link file entry to validate
*/
private void validateEntry(LinkFileEntry entry, FileReader reader) {
private void validateEntry(LinkFileEntry entry, LinkFileEntry latestEntry, FileReader reader) {
if (!(entry instanceof LinkFileEntryV1)) {
throw new IllegalArgumentException("Invalid entry type: " + entry.getClass().getName());
}
if (entry.url() == null || entry.url().isEmpty()) {
throw new IllegalArgumentException("Entry URL cannot be null or empty");
}
if (!allowOverwriteOfTargets) {
// Only applies to non managed data.
if (latestEntry != null && entry.serial() > 0 && latestEntry.serial() > 0
&& entry.serial() <= latestEntry.serial()) {
throw new IllegalArgumentException(
"Entry serial %d must be greater than latest serial %d".formatted(entry.serial(), latestEntry.serial()));
}
if (latestEntry != null && entry.timestamp() > 0 && latestEntry.timestamp() > 0
&& entry.timestamp() < latestEntry.timestamp()) {
log.warn("Entry timestamp {} is before latest entry timestamp {} in link file {}",
entry.timestamp(), latestEntry.timestamp(), source.getFullPath());
}
boolean isManaged = meta.getPropertyBool(HEADER_DATA_LIFECYCLE_MANAGED_KEY, false);
if (!allowOverwriteOfTargets || isManaged) {
for (LinkFileEntry existingEntry : entries) {
if (existingEntry.url().equals(entry.url()) && !canReuseEntryWithSameUrl(existingEntry, entry, reader)) {
throw new IllegalArgumentException("Duplicate entry URL: " + entry.url());
Expand All @@ -71,12 +86,12 @@ private void validateEntry(LinkFileEntry entry, FileReader reader) {
}

private boolean canReuseEntryWithSameUrl(LinkFileEntry oldEntry, LinkFileEntry newEntry, FileReader reader) {
// We can reuse an entry (same url) if the entries have the same underlying file, as if not the integrity of the
// versioned link file is violated (as the new entry file overwrites the old entry file, but the old entry
// is still in the link file history).
// We can reuse an entry (same url) if the entries have the same underlying file (same MD5), as if not the
// integrity of the versioned link file is violated (as the new entry file overwrites the old entry file,
// but the old entry is still in the link file history).
// BUT haven't we already ruined the integrity when we enter here!?

if ((oldEntry.md5() != null && newEntry.md5() != null)) {
if (!Strings.isNullOrEmpty(oldEntry.md5()) && !Strings.isNullOrEmpty(newEntry.md5())) {
// Use md5 if available.
return oldEntry.md5().equals(newEntry.md5());
} else {
Expand Down Expand Up @@ -114,8 +129,9 @@ private LinkFileEntryV1 handleRepeatedEntries(LinkFileEntryV1 newEntry, FileRead
}

private LinkFileEntry findExistingEntryByMD5(LinkFileEntry entry) {
if (Strings.isNullOrEmpty(entry.md5())) return null;
for (LinkFileEntry existingEntry : entries) {
if (existingEntry.md5().equals(entry.md5())) {
if (!Strings.isNullOrEmpty(existingEntry.md5()) && existingEntry.md5().equals(entry.md5())) {
return existingEntry;
}
}
Expand Down Expand Up @@ -149,6 +165,38 @@ private void cleanEntryDataIfManaged(LinkFileEntry candiateEntry, FileReader rea
}
}

@Override
public List<String> checkIntegrity() {
var violations = new ArrayList<String>();
int prevSerial = -1;
long prevTimestamp = -1;
var urlToMd5 = new HashMap<String, String>();

for (var entry : entries) {
if (entry.serial() > 0 && prevSerial > 0 && entry.serial() <= prevSerial) {
violations.add("Entry serial %d is not greater than previous serial %d (url: %s)"
.formatted(entry.serial(), prevSerial, entry.url()));
}
if (entry.serial() > 0) prevSerial = entry.serial();

if (prevTimestamp > 0 && entry.timestamp() > 0 && entry.timestamp() < prevTimestamp) {
violations.add("Entry timestamp %d is before previous timestamp %d (url: %s)"
.formatted(entry.timestamp(), prevTimestamp, entry.url()));
}
if (entry.timestamp() > 0) prevTimestamp = entry.timestamp();

if (!Strings.isNullOrEmpty(entry.md5())) {
var existingMd5 = urlToMd5.get(entry.url());
if (existingMd5 != null && !existingMd5.equalsIgnoreCase(entry.md5())) {
violations.add("URL '%s' appears with different MD5 values (%s vs %s)"
.formatted(entry.url(), existingMd5, entry.md5()));
}
urlToMd5.put(entry.url(), entry.md5());
}
}
return violations;
}

private void checkDefaultMeta() {
if (!meta.getVersion().equals(VERSION)) {
meta.loadAndMergeMeta(getDefaultMetaContent());
Expand Down
Loading
Loading