开发者

Utility that helps in file locking - expert tips wanted

开发者 https://www.devze.com 2022-12-31 20:24 出处:网络
I\'ve written a subclass of file that a) provides methods to conveniently lock it (using fcntl, so it only supports unix, which is however OK for me atm) and b) when reading or writing asserts that th

I've written a subclass of file that a) provides methods to conveniently lock it (using fcntl, so it only supports unix, which is however OK for me atm) and b) when reading or writing asserts that the file is appropriately locked.

Now I'm not an expert at such stuff (I've just read one paper [de] about it) and would appreciate some feedback: Is it secure, are there race conditions, are there other things that could be done better … Here is the code:

from fcntl import flock, LOCK_EX, LOCK_SH, LOCK_UN, LOCK_NB

class LockedFile(file):
    """
    A wrapper around `file` providing locking. Requires a shared lock to read
    and a exclusive 开发者_开发百科lock to write.

    Main differences:
     * Additional methods: lock_ex, lock_sh, unlock
     * Refuse to read when not locked, refuse to write when not locked
       exclusivly.
     * mode cannot be `w` since then the file would be truncated before
       it could be locked.

    You have to lock the file yourself, it won't be done for you implicitly.
    Only you know what lock you need.

    Example usage::
        def get_config():
            f = LockedFile(CONFIG_FILENAME, 'r')
            f.lock_sh()
            config = parse_ini(f.read())
            f.close()

        def set_config(key, value):
            f = LockedFile(CONFIG_FILENAME, 'r+')
            f.lock_ex()
            config = parse_ini(f.read())
            config[key] = value
            f.truncate()
            f.write(make_ini(config))
            f.close()
    """

    def __init__(self, name, mode='r', *args, **kwargs):
        if 'w' in mode:
            raise ValueError('Cannot open file in `w` mode')

        super(LockedFile, self).__init__(name, mode, *args, **kwargs)

        self.locked = None

    def lock_sh(self, **kwargs):
        """
        Acquire a shared lock on the file. If the file is already locked
        exclusively, do nothing.

        :returns: Lock status from before the call (one of 'sh', 'ex', None).
        :param nonblocking: Don't wait for the lock to be available.
        """
        if self.locked == 'ex':
            return # would implicitly remove the exclusive lock
        return self._lock(LOCK_SH, **kwargs)

    def lock_ex(self, **kwargs):
        """
        Acquire an exclusive lock on the file.

        :returns: Lock status from before the call (one of 'sh', 'ex', None).
        :param nonblocking: Don't wait for the lock to be available.
        """
        return self._lock(LOCK_EX, **kwargs)

    def unlock(self):
        """
        Release all locks on the file.
        Flushes if there was an exclusive lock.

        :returns: Lock status from before the call (one of 'sh', 'ex', None).
        """
        if self.locked == 'ex':
            self.flush()
        return self._lock(LOCK_UN)

    def _lock(self, mode, nonblocking=False):
        flock(self, mode | bool(nonblocking) * LOCK_NB)
        before = self.locked
        self.locked = {LOCK_SH: 'sh', LOCK_EX: 'ex', LOCK_UN: None}[mode]
        return before

    def _assert_read_lock(self):
        assert self.locked, "File is not locked"

    def _assert_write_lock(self):
        assert self.locked == 'ex', "File is not locked exclusively"


    def read(self, *args):
        self._assert_read_lock()
        return super(LockedFile, self).read(*args)

    def readline(self, *args):
        self._assert_read_lock()
        return super(LockedFile, self).readline(*args)

    def readlines(self, *args):
        self._assert_read_lock()
        return super(LockedFile, self).readlines(*args)

    def xreadlines(self, *args):
        self._assert_read_lock()
        return super(LockedFile, self).xreadlines(*args)

    def __iter__(self):
        self._assert_read_lock()
        return super(LockedFile, self).__iter__()

    def next(self):
        self._assert_read_lock()
        return super(LockedFile, self).next()


    def write(self, *args):
        self._assert_write_lock()
        return super(LockedFile, self).write(*args)

    def writelines(self, *args):
        self._assert_write_lock()
        return super(LockedFile, self).writelines(*args)

    def flush(self):
        self._assert_write_lock()
        return super(LockedFile, self).flush()

    def truncate(self, *args):
        self._assert_write_lock()
        return super(LockedFile, self).truncate(*args)

    def close(self):
        self.unlock()
        return super(LockedFile, self).close()

(the example in the docstring is also my current use case for this)

Thanks for having read until down here, and possibly even answering :)


I am also not an expert, but there is one thing you should change, and a couple others to consider:

First off, using assert this way is a bad idea: if python is run with -O or -OO asserts are turned off and your two assert_*_lock() methods would always return True.

Second -- you need some tests. :) I took the liberty of added a custom error class and writing a couple tests. The first four pass, the last fails; which brings up the question, what should happen if the file is opened normally (as some other non-LockedFile object) and data is written to it?

Oh, and lastly -- the name LockableFile makes more sense to me, since the file can be in an unlocked state.

Here are the changes I made:

class LockedFileError(OSError): # might want IOError instead
    pass

if __name__ == '__main__':
    import unittest
    import tempfile
    import shutil
    import os

    class TestLockedFile(unittest.TestCase):
        def setUp(self):
            self.dir = tempfile.mkdtemp()
            self.testfile = testfile = os.path.join(self.dir, 'opened.txt')
            temp = open(testfile, 'w')
            temp.write('[global]\nsetting1=99\nsetting2=42\n')
            temp.close()

        def tearDown(self):
            shutil.rmtree(self.dir, ignore_errors=True)

        def test_01(self):
            "writes fail if not locked exclusively"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r+')
            self.assertRaises(LockedFileError, temp.write, 'arbitrary data')
            temp.lock_sh()
            self.assertRaises(LockedFileError, temp.write, 'arbitrary data')

        def test_02(self):
            "reads fail if not locked"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r')
            self.assertRaises(LockedFileError, temp.read)

        def test_03(self):
            "writes succeed if locked exclusively"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r+')
            temp.lock_ex()
            temp.write('arbitrary data\n')

        def test_04(self):
            "reads succeed if locked"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r')
            temp.lock_sh()
            temp.readline()
            temp.lock_ex()
            temp.readline()

        def test_05(self):
            "other writes fail if locked exclusively"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r')
            temp.lock_ex()
            testing = open(testfile, 'r+')
            # not sure if this should be OSError, IOError, or something else...
            self.assertRaises(OSError, testing.write, 'this should fail\n')

    unittest.main()

Many more tests should be written to cover the various combinations of a LockedFile with reads, writes, and other non-LockedFile file objects trying to read/write to the same actual file.

0

精彩评论

暂无评论...
验证码 换一张
取 消