From 96df6ffe634d5a3d17f5c37d9c12583c89bea065 Mon Sep 17 00:00:00 2001 From: Patrick Wolf Date: Tue, 22 Apr 2025 20:41:20 -0700 Subject: [PATCH 1/4] Update dirfs.py: propagate transaction context to underlying filesystem DirFileSystem now delegates its `transaction` and `transaction_type` to the wrapped filesystem, ensuring that `with fs.transaction:` on a `dir://` instance correctly toggles the underlying `file://` transaction. This restores atomic write semantics when using the directory wrapper. Closes #1823 --- fsspec/implementations/dirfs.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/fsspec/implementations/dirfs.py b/fsspec/implementations/dirfs.py index c0623b82f..4fe5aa560 100644 --- a/fsspec/implementations/dirfs.py +++ b/fsspec/implementations/dirfs.py @@ -12,6 +12,19 @@ class DirFileSystem(AsyncFileSystem): protocol = "dir" + # ---------------------------------------------------------------- + # Transaction delegation: use the wrapped FS’s transaction + transaction_type = property(lambda self: self.fs.transaction_type) + + @property + def transaction(self): + """ + Delegate `with fs.transaction:` to the underlying filesystem + so that dir:// writes participate in the base FS’s transaction. + """ + return self.fs.transaction + # ---------------------------------------------------------------- + def __init__( self, path=None, From af8e0aca8f95685c885985251a53cffb8b2a3e94 Mon Sep 17 00:00:00 2001 From: Patrick Wolf Date: Thu, 24 Apr 2025 12:49:26 -0700 Subject: [PATCH 2/4] Update test_dirfs.py - DirFileSystem transaction context propagation This test verifies that the fix for issue #1823 works correctly by ensuring that DirFileSystem properly propagates its transaction context to the underlying filesystem. When a transaction is started on a DirFileSystem, both the wrapper and the wrapped filesystem should have their _intrans flags set, allowing writes to be deferred until the transaction is committed. Test confirms that: - Both filesystems have _intrans=True inside transaction - Files do not appear on disk until transaction commits - Files appear correctly after transaction commit --- fsspec/implementations/tests/test_dirfs.py | 37 ++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/fsspec/implementations/tests/test_dirfs.py b/fsspec/implementations/tests/test_dirfs.py index f58969406..98c64a44a 100644 --- a/fsspec/implementations/tests/test_dirfs.py +++ b/fsspec/implementations/tests/test_dirfs.py @@ -1,5 +1,7 @@ +import os import pytest +from fsspec.implementations.local import LocalFileSystem from fsspec.asyn import AsyncFileSystem from fsspec.implementations.dirfs import DirFileSystem from fsspec.spec import AbstractFileSystem @@ -615,3 +617,38 @@ def test_from_url(m): assert fs.ls("", False) == ["file"] assert fs.ls("", True)[0]["name"] == "file" assert fs.cat("file") == b"data" + + +def test_dirfs_transaction_propagation(tmpdir): + # Setup + path = str(tmpdir) + base = LocalFileSystem() + fs = DirFileSystem(path=path, fs=base) + + # Test file path + test_file = "transaction_test.txt" + full_path = os.path.join(path, test_file) + + # Before transaction, both filesystems should have _intrans=False + assert not base._intrans + assert not fs._intrans + + # Enter transaction and write file + with fs.transaction: + # After fix, both filesystems should have _intrans=True + assert base._intrans, "Base filesystem transaction flag not set" + assert fs._intrans, "DirFileSystem transaction flag not set" + + # Write to file + with fs.open(test_file, "wb") as f: + f.write(b"hello world") + + # Check if file exists on disk - it should not until transaction commits + assert not os.path.exists(full_path), "File exists during transaction, not deferred to temp file" + + # After transaction commits, file should exist + assert os.path.exists(full_path), "File not created after transaction commit" + + # Verify content + with open(full_path, "rb") as f: + assert f.read() == b"hello world" From efb1bae1a4d94139100e519a5b9dad9b263156b4 Mon Sep 17 00:00:00 2001 From: Patrick Wolf Date: Thu, 24 Apr 2025 12:50:47 -0700 Subject: [PATCH 3/4] Update test_cached.py - transaction propagation issue This test identifies that CachingFileSystem has an issue similar to the one fixed for DirFileSystem in #1824. When a transaction is started on a CachingFileSystem, the transaction context is not propagated to the underlying filesystem, causing files to be written immediately rather than deferred. Test demonstrates that: - CachingFileSystem sets _intrans=True inside transaction - But wrapped filesystem remains with _intrans=False - Files are written immediately during transaction instead of being deferred This test documents the current behavior and could help guide a future fix. --- fsspec/implementations/tests/test_cached.py | 45 ++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/fsspec/implementations/tests/test_cached.py b/fsspec/implementations/tests/test_cached.py index c9222d5b5..f1d1f8476 100644 --- a/fsspec/implementations/tests/test_cached.py +++ b/fsspec/implementations/tests/test_cached.py @@ -19,7 +19,10 @@ LocalTempFile, WholeFileCacheFileSystem, ) -from fsspec.implementations.local import make_path_posix +from fsspec.implementations.local import ( + LocalFileSystem, + make_path_posix, +) from fsspec.implementations.zip import ZipFileSystem from fsspec.tests.conftest import win @@ -1337,3 +1340,43 @@ def test_filecache_write(tmpdir, m): assert m.cat(fn) == data.encode() assert fs.cat(fn) == data.encode() + + +def test_cachingfs_transaction_missing_propagation(tmpdir): + # Setup temp directories + storage_dir = str(tmpdir / "storage") + cache_dir = str(tmpdir / "cache") + os.mkdir(storage_dir) + os.mkdir(cache_dir) + + # Create a local filesystem and wrap it with CachingFileSystem + base = LocalFileSystem() + cachefs = CachingFileSystem( + fs=base, + cache_storage=cache_dir, + cache_check=0, + same_names=True + ) + + # Test file path + test_file = os.path.join(storage_dir, "cache_transaction_test.txt") + + # Before transaction, both filesystems should have _intrans=False + assert not base._intrans + assert not cachefs._intrans + + # Enter transaction and write file + with cachefs.transaction: + # CachingFileSystem's transaction flag is set + assert cachefs._intrans + + # But the base filesystem's transaction flag is not - this is the bug + assert not base._intrans, "Base filesystem's transaction flag is not propagated" + + # Write to file + with cachefs.open(test_file, "wb") as f: + f.write(b"cached data") + + # Check if file exists on disk - bug: it will exist immediately since + # transaction context was not propagated + assert os.path.exists(test_file), "File exists during transaction - bug confirmed" From e7121179df2c55faade1960358082d17a7c74a31 Mon Sep 17 00:00:00 2001 From: Patrick Wolf Date: Sun, 4 May 2025 22:54:36 -0700 Subject: [PATCH 4/4] Update dirfs.py - Added missing functions to propagate transactions to base class def start_transaction(self): def end_transaction(self): def invalidate_cache(self, path=None): --- fsspec/implementations/dirfs.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/fsspec/implementations/dirfs.py b/fsspec/implementations/dirfs.py index 4fe5aa560..af0c39785 100644 --- a/fsspec/implementations/dirfs.py +++ b/fsspec/implementations/dirfs.py @@ -23,6 +23,27 @@ def transaction(self): so that dir:// writes participate in the base FS’s transaction. """ return self.fs.transaction + + def start_transaction(self): + """Start a transaction and propagate to the base filesystem.""" + if hasattr(self.fs, 'start_transaction'): + self.fs.start_transaction() + super().start_transaction() # Base class handles self._intrans + + def end_transaction(self): + """End a transaction and propagate to the base filesystem.""" + if hasattr(self.fs, 'end_transaction'): + self.fs.end_transaction() + super().end_transaction() # Base class handles self._intrans + + def invalidate_cache(self, path=None): + """ + Discard any cached directory information + And delegate to the base filesystem + """ + if hasattr(self.fs, 'invalidate_cache'): + self.fs.invalidate_cache(path) + super().invalidate_cache(path) # ---------------------------------------------------------------- def __init__(