您的位置:首页 > 编程语言 > Python开发

使用 Python 进行稳定可靠的文件操作

2014-05-13 15:52 288 查看
#coding:utf--8

#author:SU

#date:2014-5-22 10:36:47

# 问题的提出:对文件中数据进行操作,然后将结果保存到文件中

# 1、with open(filename) as f:

#2、input = f.read()

#3、output = do_something(input)

#4、with open(filename, 'w') as f:

#5、f.write(output)

#失效模式的例子:

#1、失控的服务器进程溢出大量日志,磁盘被填满。write()在截断文件之后抛出异常,文件将会变成空的。

#2、应用的几个实例并行执行。在各个实例结束之后,因为混合了多个实例的输出,文件内容最终变成了天书。

#3、在完成了写操作之后,应用会触发一些后续操作。几秒钟后断电。在我们重启了服务器之后,我们再一次看到了旧的文件内容。已经传递给其它应用的数据与我们在文件中看到的不再一致。

'''

1、原子性(Atomicity)要求这个事务要么完全成功,要么完全失败。在上面的实例中,磁盘满了可能导致部分内容写入文件。另外,如果正当在写入内容时其它程序又在读取文件,它们可能获得是部分完成的版本,甚至会导致写错误

2、一致性(Consistency) 表示操作必须从系统的一个状态到另一个状态。一致性可以分为两部分:内部和外部一致性。内部一致性是指文件的数据结构是一致的。外部一致性是指文件的内容与它相关的数据是相符合的。在这个例子中,因为我们不了解这个应用,所以很难推断是否符合一致性。但是因为一致性需要原子性,我们至少可以说没有保证内部一致性。

3、隔离性(Isolation)如果在并发的执行事务中,多个相同的事务导致了不同的结果,就违反了隔离性。很明显上面的代码对操作失败或者其它隔离性失败都没有保护。

4、持久性(Durability)意味着改变是持久不变的。在我们告诉用户成功之前,我们必须确保我们的数据存储是可靠的并且不只是一个写缓存。上面的代码已经成功写入数据的前提是假设我们调用write()函数,磁盘I/O就立即执行。但是POSIX标准是不保证这个假设的。

'''

# **********4种文件更新模式***************

# 1、截断-写:假设的域模型代码读数据,执行一些计算,然后以写模式重新打开存在的文件

#1、with open(filename,'r') as f:

#2、model.read(f)

#3、model.process()

#4、with open(filename,'w') as f:

#5、model.write(f)

#ex:以读写模式打开文件(Python中的“加”模式),寻找到开始的位置,显式调用truncate(),重写文件内容。

# 优势是只打开文件一次,始终保持文件打开。举例来说,这样可以简化加锁。

#1、with open(filename,'a+') as f:

#2、f.seek(0)

#3、model.input(f.read())

#4、model.compute()

#5、f.seek(0)

#6、f.truncate()

#7、f.write(model.output())

# 2、写-替换:将新内容写到临时文件,之后替换原始文件:

# 该方法与截断-写方法相比对错误更具有鲁棒性

# 1、with tempfile.NamedTemporaryFile(

# 2、 'w',dir = os.path.dirname(filename),delete = False) as tf:

# 3、 tf.write(model.output())

# 4、 tempname = tf.name

# 5、os.rename(tempname,filename)

#####这两个模式很常见,以至于linux内核中的ext4文件系统甚至可以自动检测到这些模式,自动修复一些可靠性缺陷。但是不要依赖这一特性:你并不是总是使用ext4,而且管理员可能会关掉这一特性。

# 3、追加:追加新数据到已存在的文件 这个模式用来写日志文件和其它累积处理数据的任务。从技术上讲,它的显著特点就是极其简单。一个有趣的扩展应用就是常规操作中只通过追加操作更新,然后定期重新整理文件,使之更紧凑。

#1、with open(filename,'a') as f:

#2、f.write(model.output())

#4、这里我们将目录做为逻辑数据存储,为每条记录创建新的唯一命名的文件:该模式与附加模式一样具有累积的特点。一个巨大的优势是我们可以在文件名中放入少量元数据。举例来说,这可以用于传达处理状态的信息。spooldir模式的一个特别巧妙的实现是maildir格式。maildirs使用附加子目录的命名方案,以可靠的、无锁的方式执行更新操作。md和gocept.filestore库为maildir操作提供了方便的封装。

#1、with open(unique_filename(),'w') as f:

#2、 f.write(model.output())

#如果你的文件名生成不能保证唯一的结果,甚至有可能要求文件必须实际上是新的。那么调用具有合适标志的低等级os.open():

#在以O_EXCL方式打开文件后,我们用os.fdopen将原始的文件描述符转化为普通的Python文件对象。

# 1、fd = os.open(filename,os.O_WRONLY | os.O_CREAT | os.O_EXCL,Oo666)

# 2、with os.fdopen(fd,'w') as f:

# 3、 f.write(...)

#原子性:写-替换模式提供了原子性,因为底层的os.rename()是原子性的。这意味着在任意给定时间点,进程或者看到旧的文件,或者看到新的文件。该模式对写错误具有天然的鲁棒性:如果写操作触发异常,重命名操作就不会被执行,所有就没有用损坏的新文件覆盖正确的旧文件的风险。

#附加模式并不是原子性的,因为有附加不完整记录的风险。但是有个技巧可以使更新具有原子性:为每个写操作标注校验和。之后读日志的时候,忽略所有没有有效校验和的记录。以这种方式,只有完整的记录才会被处理。在下面的例子中,应用做周期性的测量,每次在日志中附加一行JSON记录。我们计算记录的字节表示形式的CRC32校验和,然后附加到同一行:

# 1、with open(logfile,'ab') as f:

# 2、 for(i in range(3)):

# 3、 measure = {'timestamp':time.time(),'value':random.random()}

# 4、 record = json.dumps(measure).encode()

# 5、 checksum = '{:8x}'.format(zlib.crc32(record)).encode()

# 6、 f.write(record + b' ' + checksum + b'\n')

# tong过每次创建随机值模拟测量。

#$ cat log

# {"timestamp": 1373396987.258189, "value": 0.9360123151217828} 9495b87a

# {"timestamp": 1373396987.25825, "value": 0.40429005476999424} 149afc22

# {"timestamp": 1373396987.258291, "value": 0.232021160265939} d229d937

#想要处理这个日志文件,我们每次读一行记录,分离校验和,与读到的记录比较。

# 1、with open(filename,'rb') as f:

# 2、 for line in f:

# 3、 record,checksum = line.strip().rsplit(b' ',1)

# 4、 if checksum.decode() == '{:8x}'.format(zlib.crc32(record)):

# 5、 print('read measure:{}'.format(json.loads(record.decode())))

# 6、 else:

# 7、 print('checksum error for record {}'.format(record))

#我们通过截断最后一行模拟被截断的写操作:

#$ cat log

#{"timestamp": 1373396987.258189, "value": 0.9360123151217828} 9495b87a

#{"timestamp": 1373396987.25825, "value": 0.40429005476999424} 149afc22

#{"timestamp": 1373396987.258291, "value": 0.23202

#当读日志的时候,最后不完整的一行被拒绝:

#$ read_checksummed_log.py log

#read measure: {'timestamp': 1373396987.258189, 'value': 0.9360123151217828}

#read measure: {'timestamp': 1373396987.25825, 'value': 0.40429005476999424}

#checksum error for record b'{"timestamp": 1373396987.258291, "value":'

#spooldir中的单个文件也可以在每个文件中添加校验和。另外一个可能更简单的方法是借用写-替换模式:首先将文件写到一边,然后移到最终的位置。设计一个保护正在被消费者处理的文件的命名方案。在下面的例子中,所有以.tmp结尾的文件都会被读取程序忽略,因此在写操作的时候可以安全的使用。

# 1、newfile = generate_id()

# 2、with open(newfile + '.tmp','w') as f:

# 3、 f.write(model.output())

# 4、os.rename(newfile + '.tmp',newfile)

# 截断-写是非原子性的。很遗憾我不能提供满足原子性的变种。在执行完截取操作后,文件是空的,还没有新内容写入。如果并发的程序现在读文件或者有异常发生,程序中止,我们既看不到旧的版本也看不到新的版本。

#一致性:实际上,原子性更新是内部一致性的前提条件。外部一致性意味着同步更新几个文件。这不容易做到,锁文件可以用来确保读写访问互不干涉。考虑某目录下的文件需要互相保持一致。常用的模式是指定锁文件,用来控制对整个目录的访问。

#写程序example:

# 1、with open(os.path.join(dirname,'.lock'),'a+') as lockfile:

# 2、 fcntl.flock(lockfile,fcntl.LOCK_EX)

# 3、 model.update(dirname)

#读程序example:

# 1、with open(os.path.join(dirname,'.lock'),'a+') as lockfile:

# 2、 fcntl.flock(lockfile,fcntl.LOCK_SH)

# 3、 model.readall(dirname)

#该方法只有控制所有读程序才生效。因为每次只有一个写程序活动(独占锁阻塞所有共享锁),所有该方法的可扩展性有限。

#我们可以对整个目录应用写-替换模式。这涉及为每次更新创建新的目录,更新完成后改变符合链接。举例来说,镜像应用维护一个包含压缩包和列出了文件名、文件大小和校验和的索引文件的目录。当上流的镜像更新,仅仅隔离地对压缩包和索引文件进项原子性更新是不够的。相反,我们需要同时提供压缩包和索引文件以免校验和不匹配。为了解决这个问题,我们为每次生成维护一个子目录,然后改变符号链接激活该次生成。

'''

01 mirror

02 |-- 483

03 | |-- a.tgz

04 | |-- b.tgz

05 | `-- index.json

06 |-- 484

07 | |-- a.tgz

08 | |-- b.tgz

09 | |-- c.tgz

10 | `-- index.json

11 `-- current -> 483

'''

#新的生成484正在被更新的过程中。当所有压缩包准备好,索引文件更新后,我们可以用一次原子调用os.symlink()来切换current符号链接。其它应用总是或者看到完全旧的或者完全新的生成。读程序需要使用os.chdir()进入current目录,很重要的是不要用完整路径名指定文件。否在当读程序打开current/index.json,然后打开current/a.tgz,但是同时符号链接已经改变时就会出现竞争条件。

#隔离性意味着对同一文件的并发更新是可串行化的——存在一个串行调度使得实际执行的并行调度返回相同的结果。“真实的”数据库系统使用像MVCC这种高级技术维护可串行性,同时允许高等级的可并行性。回到我们的场景,我们最后使用加锁来串行文件更新。

#对截断-写更新进行加锁是容易的。仅仅在所有文件操作前获取一个独占锁就可以。

# 下面的例子代码从文件中读取一个整数,然后递增,最后更新文件:

# 1、def update():

# 2、 with open(filename,'r+') as f:

# 3、 fcntl.flock(f,fcntl.LOCK_EX)

# 4、 n = int(f.read())

# 5、 n += 1

# 6、 f.seek()

# 7、 f.truncate()

# 8、 f.write('{}\n'.format(n))

#写替换

# error代码

# 1、def update():

# 2、 with open(filename) as f:

# 3、 fcntl.flock(f,fcntl.LOCK_EX)

# 4、 n = int(f.read())

# 5、 n += 1

# 6、 with tempfile.NameTemporaryFile(

# 7、 'w',dir = os.path.dirname(filename),delete = False) as tf:

# 8、 tf.write('{}\n'.format(n))

# 9、 tempname = tf.name

# 10、 os.rename(tempname,filename)

# 设想两个进程竞争更新某个文件。第一个进程运行在前面,但是第二个进程阻塞在fcntl.flock()调用。当第一个进程替换了文件,释放了锁,现在在第二个进程中打开的文件描述符指向了一个包含旧内容的“幽灵”文件(任意路径名都不可达)

# 想要避免这个冲突,我们必须检查打开的文件是否与fcntl.flock()返回的相同。

#LockedOpen上下文管理器来替换内建的open上下文。来确保我们实际打开了正确的文件:

'''

class LockedOpen(object):

def __init__(self, filename, *args, **kwargs):

self.filename = filename

self.open_args = args

self.open_kwargs = kwargs

self.open_fileobj = None

def __enter__(self):

f = open(self.filename,*self.open_args,**self.open_kwargs)

while True:

fcntl.flock(f,fcntl.LOCK_EX)

fnew = open(self.filename,*self.open_args,**self.open_kwargs)

if os.path.sameopenfile(f.fileno(),fnew.fileno()):

fnew.close()

break

else:

f.close()

f = fnew

self.fileobj = f

return f

def __exit__(self, exc_type, exc_val, exc_tb):

self.fileobj.close()

'''

#给追加更新上锁如同给截断-写更新上锁一样简单:需要一个排他锁,然后追加就完成了。需要长期运行的会将文件长久的打开的进程,可以在更新时释放锁,让其它进入。

#spooldir模式有个很优美的性质就是它不需要任何锁。此外,你建立在使用灵活的命名模式和一个健壮的文件名分代。邮件目录规范就是一个spooldir模式的好例子。它可以很容易的适应其它情况,不仅仅是处理邮件。

#持久性: 持久性有点特殊,因为它不仅依赖于应用,也与OS和硬件配置有关。理论上来说,我们可以假定,如果数据没有到达持久存储,os.fsync()或os.fdatasync()调用就没有返回结果。在实际情况中,我们有可能会遇到几个问题:我们可能会面对不完整的fsync实现,或者糟糕的磁盘控制器配置,它们都无法提供任何持久化的保证

#通过截断-写模式,在结束写操作以后关闭文件以前,我们需要发送一个同步信号。注意通常这还牵涉到另一个层次的写缓存。glibc 缓存 甚至会在写操作传递到内核以前,在进程内部拦住它。同样为了得到空的glibc缓存,我们需要在同步以前对它flush():

# 1、with open(filename,'w') as f:

# 2、 model.write(f)

# 3、 f.flush()

# 4、 os.fdatasyns(f)

#大多数时候相较os.fsync()我更喜欢os.fdatasync(),以此避免同步元数据的更新(所有权、大小、mtime…)。元数据的更新可最终导致磁盘I/O搜索操作,这会使整个过程慢不少。

#确保在代替旧文件之前,新写入文件的内容已经写入了非易失性存储器上了

# 替换操作: 旧文件和新文件都在同一个目录下,我们可以使用简单的解决方案来逃避这个这题。

#1、os.rename(tempname,filename)

#2、dirfd = os.open(os.path.dirname(filename),os.O_DIRECTORY)

#3、os.fsync(dirfd)

#4、os.close(dirfd)

#我们调用底层的os.open()来打开目录(Python自带的open()方法不支持打开目录),然后在目录文件描述符上执行os.fsync()。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: