My tools of the trade for python programming.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

386 lines
11 KiB

#!/usr/bin/python
# $Id: file_utils.py,v 1.2 2010-09-27 19:54:29 wirawan Exp $
#
# wpylib.file.file_utils module
# File-manipulation utilities
#
# Wirawan Purwanto
# Created: 20090601
#
# Routines put here are commonly used in my own scripts.
# They are not necessarily suitable for general-purpose uses; evaluate
# your needs and see if they can them as well.
#
# 20090601: Created as pyqmc.utils.file_utils .
# 20100927: Moved to wpylib.file.file_utils .
#
"""
wpylib.file.file_utils
Common file-manipulation utilities.
This module is part of wpylib project.
"""
import bz2
import glob
import gzip
import os
import os.path
try:
import subprocess
has_subprocess = True
except:
has_subprocess = False
try:
import lzma
has_lzma = True
except:
try:
from backports import lzma
has_lzma = True
except:
has_lzma = False
from wpylib.sugar import is_iterable
class super_file(object):
'''"Super-file" hack wrapper for a file-like object.
Intended to allow extra capabilities to file-like iterators such as:
* ability to push back text lines for the subsequent next() calls.
This is to provide some level of rewinding in parsing text files.
* what else?
'''
def __init__(self, obj):
'''Creates a super_file wrapper around the "obj" object.'''
self.obj = obj
self.pushback = []
def __iter__(self):
return self
def close(self):
return self.obj.close()
def flush(self):
return self.obj.flush()
def next(self):
if len(self.pushback) > 0:
return self.pushback.pop()
else:
return self.obj.next()
def push(self, s):
self.pushback.append(s)
def open_input_file(fname, superize=0):
if fname.endswith(".bz2"):
fobj = bz2.BZ2File(fname, "r")
elif fname.endswith(".gz") or fname.endswith(".Z"):
fobj = gzip.GzipFile(fname, "r")
elif fname.endswith(".lzma"):
# until lzma has a "standard" python module, we use "lzma" executable:
if has_lzma:
fobj = lzma.LZMAFile(fname, "r")
else:
lzma_exe = path_search(os.environ["PATH"].split(os.pathsep),
("lzma", "xz"),
filetest=is_executable_file)
if lzma_exe == None:
raise IOError, "Cannot find lzma or xz executable file."
if has_subprocess:
px = subprocess.Popen((lzma_exe, "-dc", fname), stdout=subprocess.PIPE)
fobj = px.stdout
else:
fobj = os.popen('" -dc "' + fname + '"', "r")
elif fname.endswith(".xz"):
# until lzma has a "standard" python module, we use "lzma" executable:
if has_lzma:
fobj = lzma.LZMAFile(fname, "r")
elif has_subprocess:
px = subprocess.Popen(("xz", "-dc", fname), stdout=subprocess.PIPE)
fobj = px.stdout
else:
fobj = os.popen('xz -dc "' + fname + '"', "r")
else:
fobj = open(fname, "r")
if superize:
return super_file(fobj)
else:
return fobj
# Miscellaneous functions:
# - extended path manipulation/file inquiries (os.path-like functionalities)
def file_exists_nonempty(path):
"""Determines whether a given path is a regular file of
nonzero size."""
return os.path.isfile(path) and os.stat(path).st_size > 0
def is_executable_file(path):
"""Determines whether a regular file exists and is executable.
This implements the "-x" action of the shell's test command.
"""
# Ref: http://stackoverflow.com/questions/377017/test-if-executable-exists-in-python
return os.path.isfile(path) and os.access(path, os.X_OK)
def is_writable(path):
"""Determines whether a path exists and is writable by the current user,
like the `test -w' shell command.
"""
# Ref: http://stackoverflow.com/questions/2113427/determining-whether-a-directory-is-writeable
# Ref: http://stackoverflow.com/a/2113750/655885
return os.access(path, os.W_OK) # W_OK is for writing, R_OK for reading, etc.
def is_readable(path):
"""Determines whether a path exists and is readable by the current user,
like the `test -r' shell command.
"""
return os.access(path, os.R_OK)
def dirname2(path):
"""Returns the directory part of a path.
The difference from os.path.dirname is that if the directory
part is empty, it is converted to '.' (the current directory)."""
d = os.path.dirname(path)
if d == '': d = '.'
return d
# The following 3 routines are from
# http://code.activestate.com/recipes/208993-compute-relative-path-from-one-directory-to-anothe/
# by Cimarron Taylor
# (PSF license)
#
# (WP note: not sure if relpath below adds functionality or has different effects
# compared to os.path.relpath available in Python 2.6+).
def _pathsplit(p, rest=[]):
(h,t) = os.path.split(p)
if len(h) < 1: return [t]+rest
if len(t) < 1: return [h]+rest
return _pathsplit(h,[t]+rest)
def _commonpath(l1, l2, common=[]):
if len(l1) < 1: return (common, l1, l2)
if len(l2) < 1: return (common, l1, l2)
if l1[0] != l2[0]: return (common, l1, l2)
return _commonpath(l1[1:], l2[1:], common+[l1[0]])
def relpath(p1, p2):
"""Computes the relative path of p2 with respect to p1."""
(common,l1,l2) = _commonpath(_pathsplit(p1), _pathsplit(p2))
p = []
if len(l1) > 0:
p = [ '../' * len(l1) ]
p = p + l2
return os.path.join( *p )
# /// end code snippet
def path_split_all(p):
"""Completely decompose a filename path into individual components
that can be rejoined later.
"""
return _pathsplit(p)
def path_prep(*paths):
"""Like os.path.join, except that the directory part is created \
on-the-fly as needed."""
from os.path import dirname, isdir, join
path = join(*paths)
d = dirname(path)
mkdir_p(d)
return path
def mkdir_p(name):
"""A pure python implementation of my shell favorite `mkdir -p' command.
To conform to that command's behavior, we will not issue an error
if the file name exists and is a directory.
Returns 1 if new directories are made, returns -1 if nothing is done."""
from os.path import isdir
if isdir(name):
return -1
else:
os.makedirs(name)
return 1
# - globbing
def glob_files(filespec):
'''Processes a glob string, or does nothing (pass-on only) if an iterable object
(e.g. list or tuple) is already given.
When globbing is done, the result is sorted for predictability.'''
if getattr(filespec, "__iter__", False):
return filespec # no re-sorting
elif isinstance(filespec, basestring):
return sorted(glob.glob(filespec))
else:
raise ValueError, "Don't know how to glob for an object of " + type(filespec)
# - file searches and filesystem scans
def list_dir_entries(D, symlinks=False, sort=False):
"""Returns a list of files (actually, non-dirs) and dirs in a given directory.
If symlinks == True, the symbolic links will be separated from the rest.
This routine builds upon os.listdir() routine.
Will return a 4-tuple, containing:
- dir entries
- regular file and other non-dir entries
- symlink dir entries
- symlink regular file and other non-dir entries
The latter two would be empty if symlinks == False.
"""
from os.path import isdir, islink, join
entries = os.listdir(D)
dirs, nondirs = [], []
if symlinks:
s_dirs, s_nondirs = [], []
else:
s_dirs, s_nondirs = dirs, nondirs
rslt = {
# +-- symlink?
# v v--- dir or not
False: { True: dirs, False: nondirs },
True: { True: s_dirs, False: s_nondirs },
}
for E in entries:
full_E = join(D,E)
rslt[bool(islink(full_E))][bool(isdir(full_E))].append(E)
if sort:
if not isinstance(sort, dict):
sort = {}
dirs.sort(**sort)
nondirs.sort(**sort)
if symlinks:
s_dirs.sort(**sort)
s_nondirs.sort(**sort)
if symlinks:
return (dirs, nondirs, s_dirs, s_nondirs)
else:
return (dirs, nondirs, [], [])
def path_search(*specs, **opts):
'''Generalized path search.
Multiple paths can be specified for different parts of the sought filename,
and the first file found is returned.
Additional options:
* pathsep="/" -- path separator
* filetest=os.path.isfile -- filetest operator to be used
* raise_error=False -- do we want to raise an exception if the file
is not found after all possible searches?
'''
path_join = os.path.join
# FIXME: this can be extremely expensive!
xspecs = []
xlen = []
xstride = []
xtot = 1
pathsep = opts.get("pathsep", "/")
filetest = opts.get("filetest", os.path.isfile)
for spec in specs:
if not is_iterable(spec): # maybe a string?
xspecs.append((spec,))
xlen.append(1)
else:
xspecs.append(tuple([ x for x in spec ]))
xlen.append(len(xspecs[-1]))
xstride.append(xtot)
xtot *= xlen[-1]
for idx in xrange(xtot):
idx0 = idx
# Construct the filename based on the index: we reconstruct
# the indices for all the parts given in the argument, then
# concatenate them to get the full pathname
s = ""
for d in xrange(len(xspecs)-1,-1,-1):
a = idx0 / xstride[d]
if s == "":
s = xspecs[d][a]
else:
s = xspecs[d][a] + pathsep + s
idx0 = idx0 % xstride[d]
#print a,
#print s
if filetest(s):
return s
if opts.get("raise_error", False):
raise ValueError, "Cannot find file with specified combination"
else:
return None
def scan_directories(D, testdir):
"""Recursively scans a directory tree for candidate of
relevant directories, where testdir(D,dirs,files)
return a True boolean value.
We will *not* follow symlinks.
The testdir function must have this kind of prototype:
testdir(D, dirs, files)
where:
- D (first positional argument) is the directory under consideration
- dirs (named argument) is a list containing all subdirectory entries
contained in D (symlinks or not).
- files (named argument) is a list containing all non-subdirectory
entries contained in D (other symlinks, files, pipes, sockets, etc).
"""
rslt = []
for (d, dirs, files) in os.walk(D, topdown=True):
if testdir(d, dirs=dirs, files=files):
rslt.append(d)
return rslt
def untar(archive, subdir=None, verbose=None, files=[]):
'''Extracts a TAR archive. The destination directory can be given; otherwise
the files are extracted to the current directory.
Assuming GNU tar which accepts -z and -j switches.
LZMA compression is supported via lzma program.
'''
opts = [ 'tar' ]
# Python doc says: "the arguments to the child process must start with the
# name of the command being run"
if subdir:
opts += [ "-C", subdir ]
if archive.endswith(".tar.bz2") or archive.endswith(".tbz2") or archive.endswith(".tbz") or archive.endswith(".tb2"):
opts.append("-j")
elif archive.endswith(".tar.Z") or archive.endswith(".tar.gz") or archive.endswith(".tgz"):
opts.append("-z")
elif archive.endswith(".tar.lzma") or archive.endswith(".tza") or archive.endswith(".tlz"):
opts.append("--use-compress-program=lzma")
elif archive.endswith(".tar.xz") or archive.endswith(".txz"):
opts.append("--use-compress-program=xz")
if verbose:
for i in xrange(verbose): opts.append("-v")
opts += [ "-xf", archive ]
opts += files
return os.spawnvp(os.P_WAIT, "tar", opts)