#!/usr/bin/env python
''' faulkner's rc script which loads handy modules, snippets, [sub]classes, functions, etc.
'''
import sys, os, math, string, time, ConfigParser, re, urllib, ftplib, smtplib, timeit, operator
assert sys.version > '2.2', "upgrade your python."
import httplib, inspect, types, readline, atexit, socket, threading, copy, random, compiler
from pprint import pprint
from fnmatch import fnmatch
from binascii import hexlify, unhexlify
from colorsys import *
import cPickle as pickle
from glob import glob
try:
from path import path
path.__abs__ = path.abspath
path.ext = lambda self: os.path.splitext(self)[1]
path.base = path.basename
except: pass # http://www.jorendorff.com/articles/python/path/
try:
from BitVector import BitVector
except: # http://cheeseshop.python.org/pypi/BitVector/1.2
class BitVector:
pass
BV = BitVector
try:
try: import cElementTree as elementtree
except: import elementtree
except: pass #
try: import Numeric as N, LinearAlgebra as LA, FFT
except: pass
# http://sourceforge.net/projects/numpy
# http://www.pfdubois.com/numpy/html2/numpy.html
try: from crypt import crypt
except: pass # http://www.amk.ca/python/code/crypto.html
try: import ClientForm
except: pass # http://wwwsearch.sourceforge.net/ClientForm/
try: import Levenshtein
except: pass # http://trific.ath.cx/resources/python/levenshtein/
try: from subprocess import Popen
except: pass
try:
import swiginac, Symbolic
ginac = swiginac
except: pass # http://swiginac.berlios.de/
try:
import ctypes
ctypes.find = ctypes.cdll.find
ctypes.struct = ctypes.Structure
def segfault():
''' various ways to cause a segfault using ctypes.
'''
repr(ctypes.c_char_p(1))
except: pass # http://starship.python.net/crew/theller/ctypes/
try: import Image
except: pass # http://www.pythonware.com/products/pil/
try: import Tkinter
except: print "recompile with Tkinter support!"
try:
import EasyExtend
eval("""
""")
except: pass # http://www.fiber-space.de/EasyExtend/doc/EE.html
try: # http://www.cosc.canterbury.ac.nz/~greg/python/Pyrex/
import Pyrex.Compiler.Main, Pyrex.Distutils, distutils.core, distutils.extension
def pyrexc(c0de, *a, **kw):
if os.path.isfile(c0de):
fn = c0de
else:
fn = ['/tmp/', 'C:\\temp\\'][os.name == 'nt'] + '__test__.pyx'
f = open(fn, 'w')
f.write(c0de)
f.close()
nym = os.path.basename(fn)
nym = re.sub('\W', '', nym[:nym.rindex('.')])
distutils.core.setup(
name=nym,
ext_modules=[distutils.extension.Extension(nym, [fn])],
cmd_class={'build_ext': Pyrex.Distutils.build_ext}
)
exec 'global %s;import %s' % (nym, nym)
except: pyrexc = lambda *a: None
try: # http://sourceforge.net/projects/pyinstant
import Instant
Inst = Instant.Instant()
def inline(c0de, modul='test_mdl', nyms='*', **kw):
Inst.create_extension(code=c0de, module=modul, **kw)
if isinstance(nyms, str) and (nyms != ''):
exec 'from %s import %s' % (modul, nyms)
ns = locals()
del ns['c0de'], ns['nyms'], ns['modul']
globals().update(ns)
else:
globals()[modul] = __import__(modul)
except: inline = lambda *a, **kw: None
try: # http://pymedia.org/
import pymedia.player
Playa = pymedia.player.Player()
def play(i, recrs=True, loop=True, randomize=True, vol=75, play_only=['.wma','.mp3']):
''' 'vol'[ume] is in range(256)
"i" can be either a filename or a directory name.
if directory and 'recrs', it will recurse directory
'''
assert isinstance(i, (list, str)), "the first argument must be a file/foldername, or a tuple/list of files"
Playa.setVolume(vol)
ls = []
if isinstance(i, list): ls = i
elif os.path.isfile(i): ls.append(i)
elif os.path.isdir(i):
if recrs:
for a in os.walk(i): ls += map(lambda x:os.path.join(a[0], x), a[2])
else: ls = filter(os.path.isfile, map(lambda x:os.path.join(i, x), os.listdir(x)))
else: return 'wtf?'
if isinstance(play_only, (tuple, list)) and play_only: ls = filter(lambda x:os.path.splitext(x)[1] in play_only, ls)
if Playa.isPlaying(): Playa.stop()
if not Playa.isActive(): Playa.start()
def playing_thread():
if randomize: random.shuffle(ls)
for fn in ls:
Playa.startPlayback(fn); print fn
time.sleep(1)
time.sleep(Playa.getLength())
if loop: playing_thread()
t = threading.Thread(target=playing_thread)
t.setDaemon(True)
t.start()
except:
Playa = pymedia = None
# get more spiffy libraries here: http://www.python.org/pypi?%3Aaction=index
def slow_imports():
global pylab, gtk, mk_dialog, pango, gobject, sourceview, Lingos, plot, plotf
pylab = gtk = mk_dialog = pango = sourceview = Lingos = plot = plotf = None
try:
from matplotlib import pylab
def plot(*funcs, **kw):
if len(funcs) == 0:
print "example usage:\n plot('t', lambda x: x**3, 'x+x**3', min=-10, max=10, step=.1, style='r-')"
else:
min = kw.get('min', -10)
max = kw.get('max', 10)
step = kw.get('step', .1)
sty = kw.get('style', kw.get('sty', ''))
for f in funcs:
x = map((lambda x:x*step), range(min/step, max/step))
if isinstance(f, str):
pylab.plot(x, map(lambda x_: eval(f, {'x':x_, 't':x_}), x), sty)
elif isinstance(f, list) and all([isinstance(x, (int, float, long, complex)) for x in f]):
pylab.plot(f, sty)
else: pylab.plot(x, map(f, x), sty)
pylab.show()
def plotf(filename, **kw):
data = file(filename).readlines()
if ',' in data[0]:
data = [[float(x) for x in row.split(',')] for row in data]
else:
data = [float(x) for x in data]
bins = kw.get('bins')
if bins:
data = bin(data, bins)
plot(data, **kw)
except: pass
# http://matplotlib.sourceforge.net/
try:
import gtk, pango, gobject
def mk_dialog(title, buttons):
d = gtk.Dialog(title, self.w, gtk.DIALOG_NO_SEPARATOR, buttons)
d.connect('key-press-event', lambda wid,evt:[wid.response(r) for i in [0] if evt.keyval == 65307])
d.run = lambda: [d.show_all(), gtk.Dialog.run(d)][1]
return d
def mk_win():
w = gtk.Window()
w.run = lambda: [w.show_all(), mk_thread(gtk.main)] and None
return w
except: pass
# http://www.pygtk.org/downloads.html
# http://www.pcpm.ucl.ac.be/~gustin/win32_ports/pygtk.html
try:
try: import sourceview
except:
try: import pysourceview as sourceview
except: import pygtksourceview as sourceview
Lingos = dict([(lingo.get_name(), lingo) for lingo in sourceview.SourceLanguagesManager().get_available_languages()])
except: pass
def isgenerator(f):
return f.func_code.co_flags & compiler.consts.CO_GENERATOR
inspect.isgenerator = isgenerator
slow_imports_thread = threading.Thread(target=slow_imports)
slow_imports_thread.setDaemon(True)
slow_imports_thread.start()
del slow_imports
def mk_thread(targ, start=True):
t = threading.Thread(target=targ)
if start: t.start()
return t
timeto = lambda stmt, n=1e6: timeit.Timer(stmt, setup='from user import *').timeit(n)
timeit.__dict__.update(globals())
def multiline_prompt(prompt1='>>> ', prompt2='... ', eval_=eval):
''' read successive lines into a buffer until eval_ doesn't raise an Exception.
'''
buffer = raw_input(prompt1)
try:
ret = eval_(buffer)
done = True
except:
done = False
while not done:
try:
buffer += raw_input(prompt2)
except KeyboardInterrupt:
print
return
try:
ret = eval_(buffer)
done = True
except:
done = False
return ret
histfile = os.path.join(os.path.dirname(sys.executable), '.pyhist')
try: readline.read_history_file(histfile)
except: pass
atexit.register(readline.write_history_file, histfile)
NULL = null = NIL = nil = None # in case i forget what language i'm in ;-)
math.log2 = lambda x: math.log(x, 2)
exp = math.exp = lambda z: math.e ** z
string.all = map(chr, range(256))
string.uall = map(unichr, range(256 ** 2))
try: filter and map and reduce
except:
def filter(p, s): return [i for i in s if p(i)]
def map(f, *s): return [f(*i) for i in zip(*s)]
def reduce(f, s, i=None):
r = i
for x in s:
r = f(r, x)
return r
try: apply
except: apply = lambda f, a=(), kw={}: f(*a, **kw)
try: reversed
except:
def reversed(x):
if hasattr(x, 'keys'):
raise ValueError("mappings do not support reverse iteration")
i = len(x)
while i > 0:
i -= 1
yield x[i]
try: any and all
except:
def any(s):
for x in s:
if x: return True
return False
def all(s):
for x in s:
if not x: return False
return True
def h(a):
try: help(a)
except: pass #won't print a huge ugly traceback when you Ctrl+C
def istextfile(path,blocksize=512):
if not os.path.isfile(path): return False
s = open(path).read(blocksize)
if '\0' in s: return False
if not s: return True
t = s.translate(string.maketrans('', ''), string.printable)
if (float(len(t)) / len(s)) > 0.3: return False
return True
def isiterable(o):
try:
iter(o)
return True
except:
return False
inspect.isiterable = isiterable
truth = lambda o: not not o
new_sock = lambda tcp=True: socket.socket(socket.AF_INET, tcp and socket.SOCK_STREAM or socket.SOCK_DGRAM)
fact = lambda n: reduce(lambda x,y:x*y, [1,1]+range(2,n+1))
c = lambda x,y: int(float(fact(x))/float(fact(y)*fact(x-y)))
def factor(n, pair=False):
d = 2
factors = []
while n > 1:
if n % d == 0:
factors.append(d)
n /= d
else:
d += 1
if pair:
f = {}
for i in factors:
try: f[i] += 1
except: f[i] = 1
factors = f
return factors
kwarg_str_to_dict = lambda s: eval('(lambda **kw:kw)(%s)' % s)
def modules():
s = os.popen('py -c "help(\'modules\')"').read()
s = s[s.index('a list of all available modules...\n\n')+36:s.index('\n\nEnter')].split()
while '(package)' in s:
s.remove('(package)')
return [i for i in s if not i.startswith('test')]
rl = lambda itrbl: range(len(itrbl))
def d(o=locals(), ns=True, lngth=10):
''' prettily prints lngth values at a time of o if it's an object with keys
and values, or o.__dict__ if it has one, or type(o).__dict__ if all else fails.
set ns=False if you don't want to see numbers before each key-value pair.
'''
try:
dct = o
ks = dct.keys()
dct.values()
ks.sort()
except:
try:
dct = o.__dict__
ks = dct.keys()
dct.values()
ks.sort()
except:
try:
dct = type(o).__dict__
ks = dct.keys()
dct.values()
ks.sort()
print "talkin bout", type(o)
except:
print "sorry, but i can't tell you anything about", o, ":-("
return
for i in xrange(len(ks)):
try:
if not ks[i] is '__builtins__':
if ns:
print i,
print ks[i], '\t', dct[ks[i]]
if (not (i % lngth)) and i:
a = raw_input('#hit enter to continue, or ^c to stop\n')
del a
except KeyboardInterrupt:
return
class __time__c:
def __init__(self):
self.then = self.now = 0
self.marker = -1
def __repr__(self):
self.marker += 1
self.then = self.now
self.now = time.time()
return str(self.marker) + ' ' + str(self.now - self.then)
__time__ = __time__c()
onlycontains = lambda itrbl, typ: not (False in map(lambda x: isinstance(x, typ), itrbl))
class Dict(dict):
def __add__(self, d):
r = {}
r.update(self)
r.update(d)
return r
def update(self, d):
dict.update(self, d)
return self
def __radd__(self, d):
return self.__add__(d)
def sort(self, cmp_func=None):
l = self.items()
l.sort(cmp_func)
return l
def sort_by_key(self, cmp_func=cmp):
l = self.items()
l.sort(lambda a, b: cmp_func(a[1], b[1]))
return l
def inc(self, k, a=1, i=0):
''' increments self[k] by a, or initializes self[k] to i.
returns self.
yes, the try/except is fastest of all the equivalent idioms.
'''
try: self[k] += a
except: self[k] = i
return self
def to_obj(self):
''' returns an object whose __dict__ is self
'''
class obj:
def __init__(self_):
self_.__dict__ = self
return obj()
def to_func(self):
''' returns a unary function taking keys, returning values.
'''
return lambda x: self[x]
def intersect(self, od):
if isinstance(od, dict):
return Dict([(k, v) for k, v in od.items() if ((k in self) and (self[k] == v))])
elif isinstance(od, (list, tuple)) and onlycontains(od, str):
return Dict([(k, v) for k, v in self.items() if (k in od)])
def __sub__(self, od, raiseonnotfound=False):
nd = {}
nd.update(self)
if isinstance(od, (list, tuple)) and onlycontains(od, str): pass
elif isinstance(od, dict): od = dict.keys()
elif isinstance(od, str): od = [str]
else: raise "not a dict, list, str, or tuple"
for k in od:
if raiseonnotfound:
del nd[k]
else:
try: del nd[k]
except: pass
return nd
def __rsub__(self, od):
return Dict(od).__sub__(self)
def reverse(self):
return Dict([(v, k) for k, v in self.items()])
class DefaultedDict(dict):
default = ''
def __getitem__(self, key):
try:
return dict.__getitem__(self, key)
except:
return self.default
class List(list):
def reverse(self, also_in_place=True):
if also_in_place:
__builtins__.list.reverse(self)
return self
else:
i = list(self)
i.reverse()
return i
def append(self, o):
list.append(self, o)
return self
push = append
def indices(self, c):
r = []
for i in rl(self):
if self[i] == c: r.append(i)
return r
def chunkify(self, length=4):
return List([self[i:i+length] for i in xrange(0, len(self), length)])
def sort(self, cmp_func=None, also_in_place=True):
if also_in_place:
__builtins__.list.sort(self, cmp_func)
return self
else:
i = list(self)
i.sort()
return i
def rotate(self, n=1, right=True, also_in_place=True):
n %= len(self)
if also_in_place:
if right: self = List(self[-n:] + self[:-n])
else: self = List(self[n:] + self[:n])
return self
else:
if right: return List(self[-n:] + self[:-n])
else: return List(self[n:] + self[:n])
def __mul__(self, n):
return List([copy.deepcopy(self) for i in range(n)])
chunk = lambda L, length=4: [L[i:i+length] for i in xrange(0, len(L), length)]
chunkify = lambda L, length=4: List(L).chunkify(length)
unzip = lambda L: map(lambda i: map(lambda x: x[i], L), range(len(L[0])))
List.unzip = lambda self: List(unzip(self))
class Str(str):
qwerty_dvorak = {'-_' : '[{', '=+' : ']}', 'qQ' : '\'"', 'wW' : ',<', 'eE' : '.>', 'rR' : 'pP', 'tT' : 'yY',
'yY' : 'fF', 'uU' : 'gG', 'iI' : 'cC', 'oO' : 'rR', 'pP' : 'lL', '[{' : '/?', ']}' : '=+', '\\|' : '\\|',
'aA' : 'aA', 'sS' : 'oO', 'dD' : 'eE', 'fF' : 'uU', 'gG' : 'iI', 'hH' : 'dD', 'jJ' : 'hH', 'kK' : 'tT',
'lL' : 'nN', ';:' : 'sS', '\'"' : '-_', 'zZ' : ';:', 'xX' : 'qQ', 'cC' : 'jJ', 'vV' : 'kK', 'bB' : 'xX',
'nN' : 'bB', 'mM' : 'mM', ',<' : 'wW', '.>' : 'vV', '/?' : 'zZ',}
dvorak_qwerty = Dict(qwerty_dvorak).reverse()
def rot_alpha(self, n):
L = list(self)
for i in range(len(L)):
if L[i] in string.lowercase:
L[i] = string.lowercase[(string.lowercase.index(L[i])+n)%26]
elif L[i] in string.uppercase:
L[i] = string.uppercase[(string.uppercase.index(L[i])+n)%26]
self = ''.join(L)
return self
def multi_split(self, *splits):
''' split self hierarchically. The first split key is the higher level one. Ex.:
Str("a b\nc d").multi_split("\n", " ") => [['a', 'b'], ['c', 'd']]
'''
if len(splits) == 1: return self.split(splits[0])
else: return [Str(s).multi_split(*splits[1:]) for s in self.split(splits[0])]
def reverse(self):
l = list(self)
l.reverse()
return Str(''.join(l))
def indices(self, c):
r = []
for i in rl(self):
if self[i] == c: r.append(i)
return r
def sort(self, cmp_func=None):
l = list(self)
l.sort(cmp_func)
return Str(''.join(l))
def prepend(self, s):
return Str(s + self)
def dvorak(self):
return Str(self.translate(string.maketrans(''.join(self.qwerty_dvorak.keys()), ''.join(self.qwerty_dvorak.values()))))
def qwerty(self):
return Str(self.translate(string.maketrans(''.join(self.dvorak_qwerty.keys()), ''.join(self.dvorak_qwerty.values()))))
__rot13__ = string.maketrans('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ',
'nopqrstuvwxyzabcdefghijklmNOPQRSTUVWXYZABCDEFGHIJKLM')
rot13 = lambda self: self.translate(self.__rot13__)
__rot47__ = string.maketrans(''.join(map(chr, range(33, 127))),
''.join(map(chr, range(80, 127) + range(33, 80))))
rot47 = lambda self: self.translate(self.__rot47__)
__atbash26__ = string.maketrans(string.lowercase + string.uppercase,
'zyxwvutsrqponmlkjihgfedcbaZYXWVUTSRQPONMLKJIHGFEDCBA')
atbash26 = lambda self: self.translate(self.__atbash26__)
def chunk(self, length=2):
return [self[i:i+length] for i in range(0, len(self), length)]
def histogram(self, split_pattern=None, sort=True):
r = {}
s = self
if split_pattern: s = re.split(split_pattern, s)
for c in s:
if c in r: r[c] += 1
else: r[c] = 1
if sort: r = List(r.items()).sort(lambda x, y: [[1, -1], [0, 0]][x[1] == y[1]][x[1] > y[1]])
return r
__english__ = 'etaonirs'
def fromhistogram(self, d, capsonly=True):
sorted = List(d.items()).sort(lambda x, y: [[1, -1], [0, 0]][x[1] == y[1]][x[1] > y[1]])
# XXX
return self
def __call__(self, *a, **kw):
return eval(self, *a, **kw)
def __sub__(self, s):
if not self.endswith(s): raise TypeError, "you can't subtract stringA from stringB unless stringB ends with stringA"
return Str(self[:-len(s)])
def __rsub__(self, s):
return Str(s).__sub__(self)
average = lambda *a: float(sum(a))/len(a)
def long_len(s):
r = 0L
for c in s:
r += 1L
return r
def wai(start=None, stop=None, step=None):
''' while-and-increment -- faster xrange allowing use of longs.
'''
if start is None:
start = 0
stop = 10
step = 1
elif stop is None:
stop = start
start = 0
step = 1
elif step is None:
step = 1
comparator = [operator.gt, operator.lt][step > 0]
while comparator(start, stop):
yield start
start += step
class LongFloat:
''' implementation of float using 2 longs.
arbitrary decimal precision with arbitrary size.
LongFloat('-2.456789876545678e-74567898765456789') ==
LongFloat(-2456789876545678L, (-74567898765456789 - 15)L) ==
LongFloat(-2456789876545678L, (-74567898765456789 - 15)L) ==
-2456789876545678L * (10L ** -(74567898765456789 - 15)L)
all arithmetic operations return LongFloats.
bitwise operations are funny.
'''
def __init__(self, base, exp=None):
if isinstance(base, str):
if 'e' in base.lower():
base, exp = base.lower().split('e')
self.base = long(self.base.replace('.', ''))
self.exp = long(self.exp.replace('.', ''))
if '.' in exp:
self.exp_pos = len(exp) - exp.index('.')
else:
self.exp_pos = len(exp)
elif base.count('.') == 1:
self.base = long(base.replace('.', ''))
self.exp = len(base) - base.index('.')
elif isinstance(exp, str):
self.base = long(base.replace('.', ''))
self.exp = long(exp) + len(base) - base.index('.')
def __repr__(self):
x = str(self.base) + 'e' + str(self.exp)
return x[:-self.exp_pos] + '.' + x[-self.exp_pos:]
__str__ = __repr__
def copy(self):
return LongFloat(self.base, self.exp)
def __pow__(self, other):
'''
'''
def __add__(self, other):
other = other.copy() # don't modify the other LongFloat
# XXX normalize the exponents
# XXX return a new LongFloat with the sum of the bases, and the normalized fundamentals and exponents
class Int(int):
def rotate(self, n=1, right=True, width=None):
i = rebase(self, 2)
n %= len(i)
if width != None:
i = i.zfill(width)
if right: return Int(i[-n:] + i[:-n], 2)
else: return Int(i[n:] + i[:n], 2)
to = lambda self, base=2, alphabet=None: rebase(int(self), base, alphabet)
def from_(i, frombase=2, fromalphabet=None):
return Int(int(rebase(i, 10, frombase, None, fromalphabet)))
from_ = staticmethod(from_)
class Complex(complex):
def to(self):
''' returns a tuple such that
self == self.to()[0] * (math.e ** self.to()[1])
'''
return self.mag(), self.phase()
mag = lambda self: (((self.real ** 2) + (self.imag ** 2)) ** .5)
def phase(self):
try: return math.atan(self.imag / self.real)
except ZeroDivisionError: return math.pi / 2
def from_(*a):
''' inverse of self.to()
'''
if (len(a) == 1) and isinstance(a, (tuple, list)): a = a[0]
return Complex(a[0] * (math.e ** (a[1] * 1j)))
from_ = staticmethod(from_)
def roots(self, n):
''' returns a list of Complex objects for whom c**n == self
'''
mag = self.mag() ** (1./n)
phase = self.phase()
return [Complex.from_(mag, (phase + 2*math.pi*k)/n) for k in range(n)]
def test(self, n=10):
t = (self - Complex.from_(self.to()))
assert -.1 < t.real < .1, 'real'
assert -.1 < t.imag < .1, 'imag'
roots = self.roots(n)
for i in range(n):
t = (self - (roots[i] ** n))
assert -.1 < t.real < .1, 'root %d %r %r' % (i, roots[i], t)
assert -.1 < t.imag < .1, 'root %d %r %r' % (i, roots[i], t)
def dict_to_ul(d, *order):
''' translate a dict [whose keys are all strings, and values are (str, unicode, int, long, float, dict, bool)] into an html form.
don't set prepend unless you know what you're doing; it's used during recursion.
order is a sequence of keys in d or dicts mapping 1 or more d-keys to their orders.
dict_to_ul({'a':1,'b':{'b1':1,'b2':2}},'a',{'b':['b1','b2']})
key-value pairs in d not specified in order are after the ordered keys in no particular order
'''
res = ['
']
def add(k, v, order=()):
if isinstance(v, dict):
res.append('- %s:%s
' % (k, dict_to_ul(v, *order)))
else:
res.append('- %s:
%s
' % (k, v))
items = d.items()
for order_elem in order:
if isinstance(order_elem, dict):
for k, sub_order in order_elem.iteritems():
if k in d:
v = d[k]
add(k, v, sub_order)
try: items.remove( (k, v) )
except: pass
elif order_elem in d:
v = d[order_elem]
add(order_elem, v)
try: items.remove( (order_elem, v) )
except: pass
for k, v in items:
add(k, v)
return ''.join(res) + '
'
class FFind:
''' re.finditer and re.sub for large files.
'''
def __init__(self, filename, find, replace_str='',
start_i=0, end_i=-1, start_line_i=0, end_line_i=-1,
bufsize=1000, temp_filename=None, linesep=re.compile('\r?\n?'),
do_replace=False, directive_quantifier=0, overwrite_file=False
):
''' search and replace through large files.
start_i, end_i specify the byte offsets at which to start/end.
start_line_i, end_line_i specify the line offsets at which to start/end, and override start_i, end_i.
linesep is only used to number lines for start_line_i and end_line_i.
find is a str if you want to find literally, or it can be an _sre.SRE_Pattern object as returned by re.compile.
replace_str can be a str or a callable taking a match object.
if temp_filename is None, i will make a temp file [with a name composed of underscores] in the OS-appropriate directory.
if do_replace: the temp file will be created and written according to the other arguments.
else: no temp file will be made, and you can iterate over me to obtain matches.
'''
self.filename = filename
self.file = file(filename)
self.do_replace = do_replace
self.buffer = ''
self.i = 0
self.line_i = 0
self.end_i = end_i
self.end_line_i = end_line_i
# XXX def is_at_end():
if isinstance(find, str):
self.find = re.compile(re.escape(find))
elif hasattr(find, 'match') and hasattr(find, 'sub'):
self.find = find
else:
raise TypeError, 'find must be an instance of _sre.SRE_Pattern, as returned by re.compile.'
if do_replace:
if temp_filename is None:
self.temp_filename = ['/tmp/_', 'C:\\TEMP\\_'][os.name == 'nt']
if not os.path.exists(os.path.dirname(self.temp_filename)):
self.temp_filename = os.path.join(os.curdir, '_')
while os.path.exists(self.temp_filename):
# make damn sure we don't clobber something
self.temp_filename += '_'
self.temp_file = file(self.temp_filename, 'w')
# XXX copy to start
# XXX replace, writing to self.temp_file
else:
self.temp_filename = None
self.temp_file = None
if start_line_i > 0:
for i, line in enumerate(self.file):
if i == start_line_i:
break
self.line_i = start_line_i
else:
self.file.seek(self.start_i)
def __list__(self):
return [match for match in self]
def __iter__(self):
''' yield successive matches in self.filename.
'''
if self.do_replace:
raise TypeError, "i already wrote a modified copy of your file. do not specify do_replace if you want to iterate over me."
else:
c = self.f.read(1)
while c and not self.get_stopped():
self.buffer = (self.buffer + c)[-self.bufsize:]
# XXX conditional yield
c = self.f.read(1)
def get_my_ip():
''' returns the actual ip of the local machine.
'''
csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
csock.connect(('www.google.com', 80))
(addr, port) = csock.getsockname()
csock.close()
return addr
def clear(h=10, w=80):
''' return ASCII magic [which you must print] to clear the last h lines of the terminal.
'''
return (('\r' + (' ' * w) + '\r\b') * h)
def branch(n=100):
''' yield (0,0),(1,0),(0,1),(2,0),(1,1),(0,2),(3,0),(2,1),(1,2),(0,3),(4,0),(3,1),(2,2),(1,3),(0,4),...,(n,0),...,(0,n)
'''
yield (0, 0) # range(0) == []
for x in xrange(n):
for i in xrange(x + 1):
yield ((x - i), i)
print
class EventQueue(threading.Thread):
''' queues function calls, saves their return values in self.return_values.
>>> eq = EventQueue() # start a thread watching self.queue
>>> eq.push(slow_func); eq.push(slow_func, args); eq.push(slow_func, args, kwargs)
# push some slow [but terminating!] function calls, which will be executed in the order in which they were pushed
>>> eq.stop() # stop the thread as soon as the current call returns, possibly preventing some calls from being executed.
'''
def __init__(self):
threading.Thread.__init__(self)
self.queue = []
self.nap_time = .1
self.start()
self.n = 0
self.serving = None
self.keep_history = True
self.return_values = {} # maps numbers of calls to their return values; unused if not self.keep_history
def error(self, error, function, a=(), kw=None):
''' called when function raises error.
'''
print >> sys.stderr, 'EventQueue event raised exception (', function, a, kw or {}, '):', error
def push(self, function, a=(), kw=None):
''' queue the call to function, return the number of the call.
'''
self.queue.append( (function, a, kw or {}) )
return self.n + len(self.queue)
def stop(self):
self.running = False
def get(self, n):
''' Block until self.n >= n; return whatever the n-th call returned [assuming self.keep_history].
'''
while self.n < n:
time.sleep(self.nap_time)
return self.return_values.get(n, None)
def run(self):
''' a blocking loop which continually calls functions as specified in self.queue.
'''
self.running = True
while self.running:
if len(self.queue) == 0:
time.sleep(self.nap_time)
else:
function, a, kw = self.queue.pop(0)
self.serving = (function, a, kw)
try:
if self.keep_history:
self.return_values[self.n] = function(*a, **kw)
else:
function(*a, **kw)
except Exception, error:
self.error(error, function, a, kw)
self.n += 1
def queue_event(f):
''' decorator which queues method/function calls in
self.eventqueue [if f is a method whose first argument is 'self'],
otherwise f.eventqueue.
'''
args = inspect.getargspec(f)[0]
if args and (args[0] == 'self'):
def decorated(self, *a, **kw):
self.eventqueue.push(f, (self,) + a, kw)
else:
f.eventqueue = EventQueue()
def decorated(*a, **kw):
f.eventqueue.push(f, a, kw)
decorated.__name__ = f.__name__
decorated.__doc__ = f.__doc__
return decorated
def rebase(i, frombase=None, tobase=None, fromalphabet=None, toalphabet=None, resize=1, too_big=40000, debug=False):
''' if frombase is not specified, it is guessed from the type and/or char in i with highest ord.
tobase defaults to [10, 2][frombase == 10].
the alphabets are map(chr, range(256)) if its base is between 62 and 255;
otherwise, string.digits+string.letters.
always returns a string which is also valid input.
valid bases are ints in range(-256, 257).
alphabets must be subscriptable, and can only contain str's.
invalid tobases are replied with 'why?'; rebase('why?') == '217648673'.
returned string is zfilled to the next largest multiple of resize
'''
__todo__ = [
'negative i?',
]
if frombase == None:
if isinstance(i, (int, long)):
frombase = 10
elif isinstance(i, str):
a = str(i)
if any([(chr(x) in a) for x in range(ord('0')) + range(58, 65) + range(91, 97) + range(123, 256)]):
frombase = max(map(ord, a)) + 1
else:
frombase = max(map((string.digits + string.letters).index, a)) + 1
if tobase == None:
tobase = [10, 2][frombase == 10]
# got bases, ensuring that everything is an int
tobase = int(tobase)
frombase = int(frombase)
abstobase = abs(tobase)
absfrombase = abs(frombase)
if absfrombase in [0, 1]:
i = len(str(i))
elif 2 <= frombase <= 36:
# may be difficult to translate to C
i = int(str(i), frombase)
else:
i = str(i)
n = 0
if fromalphabet == None:
if 62 <= absfrombase <= 256:
fromalphabet = map(chr, range(256))
else:
fromalphabet = string.digits + string.letters
fromalphabet = fromalphabet[:absfrombase]
for j in range(len(i)):
n += (frombase ** j) * fromalphabet.index(i[-1-j])
i = n
# got ints, converting to tobase
if debug: print 'converting %d from base %d to %d' % (i, frombase, tobase)
if abstobase in [0, 1]:
return '0' * ((i > 0) and int(i) or 0)
elif abstobase > 256:
return 'why?'
# if execution gets here, we might want the result to be zfilled to a multiple of resize
r = ''
if tobase == 10:
r = str(i)
else:
if i < 0:
print 'negative',
i = -i
if toalphabet is None:
if 62 <= abstobase <= 256:
toalphabet = map(chr, range(abstobase))
else:
toalphabet = (string.digits + string.letters)[:abstobase]
if tobase < 0:
i = -i
j = 0
while i != 0:
r = toalphabet[i % tobase] + r
i /= tobase
j += 1
if j >= too_big: raise "call again, setting too_big bigger"
if resize > 1:
if 62 <= abstobase <= 256:
r = toalphabet[0] * (resize - (len(r) % resize)) + r
else:
r = r.zfill(len(r) + resize - (len(r) % resize))
return r
class execc:
def __init__(self, *a, **kw):
self.a = a
self.kw = kw
def __repr__(self):
for i in self.a:
exec i
return self.kw.get('ret', '')
def __call__(self, *a, **kw):
repr(execc(*(self.a + a), **(Dict(self.kw) + Dict(kw))))
class thread(threading.Thread):
def __init__(self, func):
threading.Thread.__init__(self, target=func)
def __call__(self, *a):
self._Thread__args = a
self.start()
return self
def stop(self):
self._Thread__stop()
threadify = threaded = lambda func: lambda *a: thread(func)(*a)
def defined(n):
''' an excessively magical function which returns a boolean
indicating whether a name (str) n is defined in the calling frame.
'''
if n in __builtins__.__dict__: return True
f = sys._getframe(1)
if (n in f.f_globals) or (n in f.f_locals): return True
return False
class SH:
''' process ~/.bashrc, save aliases/functions, run commands based on them.
an extremely simple shell written in python.
>>> SH().mainloop()
'''
__todo__ = [
'backtick',
'tab completion?',
]
def __init__(self, rc='~/.bashrc'):
self.aliases = {}
in_func = False
func_buffer = ''
for line in file(os.path.expanduser(rc)):
if in_func:
if re.match('\s*\}\s*', line):
# do something with func_buffer
func_buffer = ''
in_func = False
else:
func_buffer += line
elif line.startswith('alias'):
try:
m = re.match('^\s*alias\s+(?P\w+)\s*="(?P.+)"\s*$', line).groupdict()
name, cmd = m['name'], m['cmd']
self.aliases[name] = cmd
except:
pass
self.aliases['cd'] = lambda directory: os.chdir(os.path.expanduser(directory)) or ''
def unalias(self, cmd):
'''
'''
splitted = cmd.split()
name = splitted[0]
if name in self.aliases:
if isinstance(self.aliases[name], str):
return self.aliases[name] + ' ' + ' '.join(splitted[1:])
else:
return self.aliases[name](' '.join(splitted[1:]))
else:
return cmd
def prompt(self):
''' return raw_input(...)
'''
try:
return raw_input(os.path.expanduser('~').split(os.sep)[-1] + ' ' + os.path.abspath(os.curdir) + ' $ ')
except EOFError:
print
return
except KeyboardInterrupt:
print
return True
def mainloop(self):
cmd = self.prompt()
while cmd:
if isinstance(cmd, (str, unicode)):
self(cmd)
cmd = self.prompt()
def __call__(self, cmd):
''' process a command, either by calling an os function,
or passing cmd to os.system [possibly after modification by self.aliases].
'''
os.system(self.unalias(cmd))
def Print(*a):
for i in a: print i
def Exec(__x):
exec __x
sys._getframe(1).f_locals.update(locals())
def For(__For_init, __For_test, __For_end, __For_body='pass', __For_terminate_scope=False):
''' a C-style for loop:
For('i=0','i<10','i++','print i')
For('i = 10', 'i > 0', '', """
if raw_input(str(i)): i -= 1
""")
For('', 'raw_input()', '', 'print "hi!"')
# if the init and end statements are omitted, this is essentially a while loop.
the post-*crement ('i++', 'i--') unary operator is only supported in the end-loop statement.
(it's replaced with '+=1', '-=1'.)
the test expression cannot be empty, but the init and end statements can. the body statement[s] can be 'pass'
expert users can create self-modifying code (without using the dis module!):
just play with the multi-line string __For_body
(which is exec'd in sys._getframe(1).f_locals, locals()),
the expression __For_test,
and the statement __For_end
if the last argument is False, any variables declared in this function are inserted into the calling frame's locals.
if it's True, any variables declared in this function are garbage-collected when this function terminates.
all in all, you might as well just use a while loop.
'''
__For_frame = sys._getframe(1)
__For_end = __For_end.replace('++', '+=1').replace('--', '-=1')
exec __For_init in __For_frame.f_globals, __For_frame.f_locals
while eval(__For_test, __For_frame.f_globals, __For_frame.f_locals):
exec __For_body in __For_frame.f_globals, __For_frame.f_locals
exec __For_end in __For_frame.f_locals, __For_frame.f_locals
if not __For_terminate_scope:
# insert locals whose name doesn't start with '__For_' into the calling frame's locals
sys._getframe(1).f_locals.update(
dict(filter(lambda kv:not kv[0].startswith('__For_'), [(k, v) for k, v in locals().items()])))
def Lambda(nym='__Lambda_', args='', body='pass', buffered=False):
locls = sys._getframe(1).f_locals
exec ('def %s(%s):\n ' % (nym, args)
+ (buffered and 'try:\n ' or '')
+ ('\n' + (' ' * [4, 8][buffered])).join(re.split('\r?\n?', body))
+ (buffered and 'except Exception, e:return e' or '')
) in locls
return eval(nym, locls)
def Class(nym='__Class_', super_meta='', body='pass'):
locls = sys._getframe(1).f_locals
exec ('class %s%s:\n ' % (nym, ((isinstance(super_meta, str) and (super_meta != '')) and ('(%s)' % super_meta) or ''))
+ '\n '.join(re.split('\r?\n?', body))
) in locls
return eval(nym, locls)
def TryExcept(trybody, exceptn='Exception, e', exceptbody='pass'):
f = sys._getframe(1)
exec ('try:\n '
+ '\n '.join(re.split('\r?\n?', trybody))
+ 'except %s:\n ' % exceptn
+ '\n '.join(re.split('\r?\n?', exceptbody))
) in f.f_globals, f.f_locals
def In(a, b): return a in b
def PrintErr(*a):
for i in a: print >> sys.stderr, i
print_, exec_, for_, lambda_, class_, tryexcept_, in_, printerr_ = Print, Exec, For, Lambda, Class, TryExcept, In, PrintErr
puts = print_
__add__, __mul__, __div__, __sub__, __mod__, __pow__ = map(lambda x:eval('lambda a,b:a%sb'%x), list('+*/-%')+['**'])
lebesgue_norm = lambda p, lst: ((sum(map(lambda x: float(abs(x)) ** p, lst)) / float(len(lst))) ** (1.0 / p))
def intersect(a, b):
''' intersection(a, b) -> list of items in both A and B
The order of the result items is unspecified.
If all items are hashable, then this algorithm is
O(size(a) + size(b)); otherwise, it is O(size(a) * size(b))
'''
try:
d = {}
for x in b: d[x] = 1
c = [x for x in a if d.has_key(x)]
except TypeError: c = [x for x in a if x in b]
return c
mean = lambda L: sum(L) / float(len(L))
def variance(L):
mu = mean(L)
return sum([((v - mu) ** 2) for v in L]) / float(len(L))
stddev = lambda L: math.sqrt(variance(L))
def flatten(L):
res = []
for elem in L:
if isinstance(elem, (tuple, list)):
res.extend(flatten(elem))
else:
res.append(elem)
return res
def mapdict(f, d):
d1 = {}
for k, v in d.items():d1[k] = f(v)
return d1
def whose(o, *a, **kw):
a = chunk(a, 2) + kw.items()
map(eval(['o.__setattr__', 'o.__setitem__'][isinstance(o, dict)]), *unzip(a))
return o
def curry(fn, *a1, **kw1): # pep 309
def call_fn(*a2, **kw2):
d = {}
map(d.update, [kw1, kw2])
return fn(*(a1 + a2), **d)
return call_fn
partial = curry
def convolve(*fs):
fs = list(fs)
fs.reverse()
def newfunc(*a, **kw):
r = fs[0](*a, **kw)
for f in fs[1:]: r = f(r)
return r
return newfunc
seven_seg = lambda z: ''.join([''.join([' _ |_|_ _| |'[ord("'\xa5\x8f\xb1\xdb\xad\xbdi\x03K\x9f'"[int(a)]) % u:][:3] for a in z]) + "\n" for u in (3, 14, 10)])
prod = lambda L: reduce(lambda a, b: a * b, L)
# functional implementation of 333's cons, car, cdr
cons = lambda a, d: lambda c: c(a, d)
car = lambda c: c(lambda a, d: a)
cdr = lambda c: c(lambda a, d: d)
class Namespace:
def __init__(self, d=globals()): self.__dict__ = d
def pygets(f, search_desc=False, prettyprint=True):
f = f.lower()
r = []
for p in re.finditer(' (?P[^&]+) (?P[^<]*) | \n (?P[^<]+) | ',
urllib.urlopen('http://cheeseshop.python.org/pypi?%3Aaction=index').read()):
if (f in p.groups()[0].lower()) or (search_desc and (f in p.groups()[2].lower())):
r.append((p.groups()[0], p.groups()[1], p.groups()[2]))
if prettyprint: print r[-1][0], r[-1][1], '\t', r[-1][2]
if not prettyprint: return r
minmax = lambda L: (min(L), max(L))
minavgmax = lambda L: (min(L), (1.0 * sum(L) / len(L)), max(L))
def gcf(a, b):
''' greatest common factor
'''
while 0 != (a % b): a, b = b, a % b
return b
r = a % b
if (r == 0): return b
else: return gcf(b, r)
gcd = gcf # greatest common denominator
def lcm(*a):
assert all([isinstance(i, (int, float, long)) for i in a])
if len(a) == 2:
return a[0] * a[1] / gcf(a[0], a[1])
elif len(a) > 2:
return reduce(lcm, *a)
def setpriority(pid=None,priority=1):
''' Set The Priority of a Windows Process. Priority is a value between 0-5 where
2 is normal priority. Default sets the priority of the current
python process but can take any valid process ID.
'''
import win32api,win32process,win32con
priorityclasses = [win32process.IDLE_PRIORITY_CLASS,
win32process.BELOW_NORMAL_PRIORITY_CLASS,
win32process.NORMAL_PRIORITY_CLASS,
win32process.ABOVE_NORMAL_PRIORITY_CLASS,
win32process.HIGH_PRIORITY_CLASS,
win32process.REALTIME_PRIORITY_CLASS]
if pid == None:
pid = win32api.GetCurrentProcessId()
handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, True, pid)
win32process.SetPriorityClass(handle, priorityclasses[priority])
def sumcos(f, X, fs, dur):
''' synthesizes a sum of cosine waves.
f = vector of frequencies
X = vector of complex numbers [phasors]
fs = sampling rate in Hz
dur = total time duration
'''
assert len(f) == len(X)
frequencies = [(math.pi * 2.0j * i / fs) for i in f]
rlf = xrange(len(f))
return [sum([(X[n] * (math.e ** (t * frequencies[n]))).real for n in rlf]) for t in xrange(int(fs * dur))]
def depth(L):
deepest = 0
for e in L:
if isinstance(e, (tuple, list)):
e_depth = depth(e) + 1 # without the "+ 1", nothing gets any bigger!
if e_depth > deepest:
deepest = e_depth
else:
if deepest == 0:
deepest = 1
return deepest
def frangex(start=None, end=None, step=None):
''' The builtin xrange doesn't support floats,
so you either need to divide all the elements of xrange by some number,
or use this generator!
For extra debugging ease, set DEBUG to True --
that will make me print each value right before I yield it.
Examples:
for t in frangex(0, 10, 1./440): print math.cos(t)
for t in frangex(-10, 10, 1./1000): print math.sin(t)
'''
if start is None:
start = 10
if end is None:
if step is None:
step = 1.0
end = start
start = 0.0
if step is None:
step = 1.0
while (((step > 0) and (end > start)) or
((step < 0) and (end < start))):
yield start
start += step
def frange(*args):
''' Non-generator version of frangex.
Returns a list like good ol' range.
'''
return [i for i in frangex(*args)]
def frangenx(*args):
''' N-dimensional frangex --
a float generator over multiple dimensions.
Loops over the last tuple most, so the order of
args is outer to inner.
If you don't pass any arguments, I assume you meant frangenx(10).
Examples:
frangenx(1, 100, 3) == frangex(1, 100, 3)
for x, y, z in nrangex((100, ), (200, ), (300, )):
# visit each point in 3d space raster-style
print x, y, z
'''
if len(args) == 1 and isinstance(args[0], (tuple, list)):
args = args[0]
elif len(args) == 0:
args = (10,)
if all([isinstance(arg, (float, int, long)) for arg in args]):
# all args are numeric -- one-dimensional mode
for i in frangex(*args):
yield i
else:
for i in frangenx(*args[:-1]):
if not isinstance(i, tuple):
# can't concatenate a tuple with a non-tuple
i = (i,)
for j in frangex(*args[-1]):
yield i + (j,)
def frangen(*args):
''' Non-generator version of frangenx.
Returns a list of tuples/floats like
two coconuts being banged together.
Example:
frangen((1,10,3),(2,20,6)) == [(x,y) for x in frangex(1,10,3) for y in (2,20,6)]
'''
return [i for i in frangenx(*args)]
def frangencx(bound, n_vars):
''' constant-bound float n-dimensional extra long range.
'''
return frangenx( * [ [[bound]] * n_vars ])
class DummyDict:
def __getitem__(self, name):
return DummyDict()
def __setitem__(self, name, value):
pass
def update(self, d):
pass
def get(self, name, default=None):
return default
setdefault = get
def __iter__(self):
for x in []:
yield x
def __str__(self):
return ''
__repr__ = __str__
def match_paren(s, i, max_time=1e9):
''' returns the index of the paren [one of "<>{}[]()"] matching the paren in s at index i
max_time is the number of chars i'll traverse before returning None if i haven't found the match.
'''
nest_level = 1
iter_time = 0
paren = s[i]
if paren in '([{<':
direction = 1 # search forward for the brother
elif paren in ')}]>':
direction = -1 # search backward
else:
return None # "paren" is not a paren
if paren in '()':
prn_type = '()'
elif paren in '[]':
prn_type = '[]'
elif paren in '{}':
prn_type = '{}'
elif paren in '<>':
prn_type = '<>'
while (nest_level > 0) and (0 <= i < (len(s) - 1)):
i += direction
iter_time += 1
if iter_time > max_time:
return None
if s[i] in prn_type:
if s[i] == paren:
nest_level += 1
else:
nest_level -= 1
return i
collapse_whitespace = lambda s: re.sub('\s+', ' ', s)
def lif(condition, yes, no):
''' lispy if, aka ternary operator.
'''
if condition:
return yes
else:
return no
def cond(*conditions, **else_clause):
''' lispy cond.
cond(
((1 < 0), '1 < 0'),
((2 > 3), '2 > 3'),
((1 < 2), 'happy'),
else='else value'
)
'''
for condition, value in conditions:
if condition:
return value
return else_clause.get('els')
def lif(condition, yes, no):
return cond((condition, yes), els=no)
class EventQueue(threading.Thread):
''' queues function calls.
'''
def __init__(self):
threading.Thread.__init__(self, target=self)
self.queue = []
self.nap_time = .1
self.start()
def error(self, error, function, a=(), kw={}):
print >> sys.stderr, 'EventQueue event raised exception(', function, a, kw, '):', error
def push(self, function, a=(), kw={}):
self.queue.append( (function, a, kw) )
def stop(self):
self.running = False
def __call__(self):
''' a blocking loop which continually calls functions as specified in self.queue.
'''
self.running = True
while self.running:
if len(self.queue) == 0:
time.sleep(self.nap_time)
else:
function, a, kw = self.queue.pop(0)
try:
function(*a, **kw)
except Exception, error:
self.error(error, function, a, kw)
xpm_data_chars = ''.join(map(chr, range(33, 127))).replace('"','').replace('\\', '')
xpm_data_char = lambda n: xpm_data_chars[int(n)]
n_xpm_data_chars = len(xpm_data_chars)
def xpm(color_func, w=100, h=100, name='faulkner'):
data = '/* XPM */\nstatic char * %s[] = {\n' % name
rgb_rows = [] # list of lists of rrggbb strings
colors = [] # list of unique colors
for y in xrange(-h//2, h//2):
rgb_rows.append([])
for x in xrange(-w//2, w//2):
pixel = color_func(x, y)
rgb_rows[-1].append(pixel)
if pixel not in colors:
colors.append(pixel)
n_colors = len(colors)
color_map = {} # {'ffffff' : '...'}
code_length = n_colors // n_xpm_data_chars
data += '"%d %d %d %d",\n' % (w, h, n_colors, code_length)
for i, xs in enumerate(frangenx(*([[n_xpm_data_chars]] * code_length))):
if i >= n_colors:
break
code = ''.join(map(xpm_data_char, xs))
color_map[colors[i]] = code
data += '"%s\tc #%s",\n' % (code, colors[i])
del colors, n_colors, i, code_length
for row in rgb_rows:
data += '"%s",\n' % ''.join([color_map[rgb] for rgb in row])
data += '};'
return data
tuple_to_hex = lambda s, chunk_len=2: ''.join([hex(int(256 * x))[2:].zfill(chunk_len) for x in s])
test_xpm_color_func = lambda x, y: tuple_to_hex(colorsys.hsv_to_rgb((math.sqrt((x ** 2) + (y ** 2)) / 100.0) % 1.0, .5, .5))
def subclassconstructor(args='()', kwargs='{}'):
''' decorator generator for subclass constructors.
automatically calls superclass.__init__(self, *eval(args), **eval(kwargs))
'''
def decorator(old__init__):
def new__init__(self, *a, **kw):
old__init__(self, *a, **kw)
if not isinstance(args, (str, unicode)):
args = '()'
frame = sys._getframe(1)
self.__class__.__base__.__init__(*eval(args, frame.f_globals, frame.f_locals), **eval(kwargs, frame.f_globals, frame.f_locals))
new__init__.__doc__ = old__init__.__doc__
return new__init__
if isinstance(args, (str, unicode)):
return decorator
elif isinstance(args, (types.FunctionType, types.MethodType)):
return decorator(args)
def kwarg_saver(old__init__):
''' decorator which saves keyword arguments in self.__dict__
'''
def new__init__(self, *a, **kw):
self.__dict__.update(kw)
old__init__(self, *a, **kw)
new__init__.__doc__ = old__init__.__doc__
return new__init__
class ChildDict(dict):
''' ensure that sub-dicts let their parent dicts know that they've been changed [so the parent dicts can, say, update a pickle file],
and that self.update is recursive, so inner dicts are updated instead of overwritten.
eg:
>>> class ParentDict(dict):
... def __getitem__(self, name):print name;return dict.__getitem__(self, name)
... def __setitem__(self, name, value):print name, value;dict.__setitem__(self, name, value)
...
>>> family_dict = ParentDict()
>>> family_dict['a'] = ChildDict({'b':{'c':'c'}})(family_dict, 'a')
a {'b':{'c':'c'}}
>>> family_dict['a'].update({'b':{'d':'d'}})
a
'''
def __repr__(self):
return 'ChildDict(' + dict.__repr__(self) + ')'
def __setitem__(self, name, new_value, update=True):
''' tell parent that i was changed.
preserve dicts i contain by updating them instead of overwriting them.
'''
if update and (name in self) and isinstance(self[name], dict) and isinstance(new_value, dict):
self[name].update(new_value)
else:
dict.__setitem__(self, name, new_value)
if isinstance(self.parent, ChildDict):
self.parent.__setitem__(self.name, self, False)
else:
self.parent[self.name] = self
def __getitem__(self, name):
''' if i contain a dict, i'll transform it into a ChildDict, too. yay, recursion!
'''
r = dict.__getitem__(self, name)
if isinstance(r, dict) and not isinstance(r, ChildDict):
r = ChildDict(r)(self, name)
dict.__setitem__(self, name, r)
return r
def __call__(self, parent, name):
''' since i can't overload dict.__init__, you must call this method before you modify me.
return self, so you don't need to waste lines setting attributes.
'''
self.parent = parent
self.name = name
return self
def update(self, d):
'''
'''
for k, v in d.iteritems():
self[k] = v
def nested_getattr(obj, attr_names):
if len(attr_names) > 0:
return nested_getattr(getattr(obj, attr_names[0]), attr_names[1:])
else:
return obj
def nested_getitem(obj, item_names, default=None):
if len(item_names) > 0:
return nested_getitem(obj.get(item_names[0], default), item_names[1:], default)
else:
return obj
def nested_setattr(obj, attr_names, value):
''' create empty_namespaces as needed.
'''
if len(attr_names) > 1:
if not hasattr(obj, attr_names[0]):
setattr(obj, attr_names[0], empty_namespace())
nested_setattr(getattr(obj, attr_names[0]), attr_names[1:], value)
else:
setattr(obj, attr_names[0], value)
pi = math.pi
class Tree(list):
def __setitem__(self, path, value):
''' modify tree [list of lists] IN PLACE, appending empty lists or overwriting non-lists with empty lists where needed.
shouldn't ever raise an exception.
>>> tree = [[['a', 'b'], 'c'], 'd']
>>> set_node(tree, [0, 0, 0], 'z')
>>> assert tree[0][0][0] == 'z'
'''
if not isinstance(path, (list, tuple)):
list.__setitem__(self, path, value)
elif len(path) > 1:
while (len(self) - 1) < path[0]:
self.append([])
if not isinstance(self[path[0]], list):
# overwrite non-list with empty list
list.__setitem__(self, path[0], [])
new_tree = Tree(self[path[0]])
new_tree[path[1:]] = value
list.__setitem__(self, path[0], new_tree)
else:
list.__setitem__(self, path[0], value)
@staticmethod
def from_lisp(s, parens=('(', ')'), string_quotes='"', not_strings = ' \t\r\n', escapes='\\', prep={"'":'quote', }):
''' parse a bit of lisp code into a tree.
Tree.from_lisp('(a (b c d) e "f \" f" (g (h (i))') == ['a', ['b', 'c', 'd'], 'e', 'f " f', ['g', ['h', ['i']]]]
closes unmatched parens at the end.
raise TypeError if not isinstance(s, (str, unicode))
'''
tree = Tree()
path = []
if not isinstance(s, (str, unicode)):
raise TypeError('need a string to parse')
in_quotes = False
escaped_in_quotes = False
in_string = False
for c in s:
# XXX
if in_quotes:
if escaped_in_quotes:
tree[path] += 1
escaped_in_string = False
else:
if c in escapes:
escaped_in_string = True
else:
tree[path] += 1
elif in_string:
if c in not_strings:
path.pop(-1)
else:
tree[path] += 1
elif c in aparens:
if isinstance(tree[path], (str, unicode)):
path.pop(-1)
tree[path].append(Tree())
path.append(0)
elif c in zparens:
path.pop(-1)
elif c in quotes:
in_quotes = True
tree[path].append('')
path[-1] += 1
elif c in not_strings:
continue
else:
in_string = True
if isinstance(tree[path], (str, unicode)):
path.pop(-1)
tree[path].append('')
return tree
@staticmethod
def from_xml(s, attrs_to_items=False, kill_attrs=False):
''' Tree.from_xml('cg') == Tree([
['a', {}, ['b', {}, 'c'], ['d', {'e':'f'}, 'g']], ['a', {}]
])
Tree.from_xml('cg', True) == Tree([
['a', [], ['b', [], 'c'], ['d', [('e', 'f')], 'g']], ['a', []]
])
Tree.from_xml('cg', False, True) == Tree([
['a', ['b', 'c'], ['d', {'e':'f'}, 'g']], ['a']
])
'''
from xml.parsers.expat import ParserCreate
tree = Tree()
path = []
names = []
parser = ParserCreate()
if attrs_to_items:
def start_element(name, attrs={}):
print name, attrs
names.append(name)
else:
def start_element(name, attrs={}):
print name, attrs
names.append(name)
if kill_attrs is True:
# kill all attrs
_start_element = start_element
def start_element(name, attrs={}):
print name, attrs
names.append(name)
elif kill_attrs is None:
# kill only empty attrs
_start_element = start_element
def start_element(name, attrs):
print name, attrs
names.append(name)
parser.StartElementHandler = start_element
def end_element(name):
''' forgive unclosed tags.
'''
while name != names.pop(): pass
parser.EndElementHandler = end_element
if isinstance(s, file):
s = s.read()
elif not isinstance(s, (str, unicode)):
raise TypeError('need either string or file')
parser.Parse('%s' % s) # the expat parser can't handle more than 1 root element
return tree[0]
@staticmethod
def from_py(s):
''' parse an indentation-colon-delimited tree [eg python source, or my todo.txts].
Tree.from_py("""
test
root:
fred
foo bar
baz:
chicken
liver
want:
meowmix
please
deliver
test
""") == Tree(
['test',
['root', 'fred ', 'foo bar',
['baz', 'chicken', 'liver',
['want', 'meowmix'],
'please'
],
'deliver'
],
'test'
]
)
replaces all tabs on the left with 4 spaces prior to parsing.
raise TypeError if not isinstance(s, (str, unicode, list, tuple, file))
raise SyntaxError if dedentation doesn't match a previous indentation, or if you mix tabs with spaces.
'''
indentations = [] # increasing sequence of ints
tree = Tree([[]])
path = [0] # the latest string/Tree added is tree[path]
tabs_or_spaces = None
assert_indent = False
if isinstance(s, (str, unicode)):
s = re.split('\r?\n?', s)
elif not isinstance(s, (list, tuple, file)):
raise TypeError('need one of string, list, tuple, or file')
for line in s:
if tabs_or_spaces is None:
tabs_or_spaces = [' ', '\t'][int(bool(re.match('\s*\t\s*', line)))]
elif re.match('\s*%s\s*' % tabs_or_spaces, line):
raise SyntaxError('do not mix tabs with spaces')
if assert_indent:
assert 'XXX'
if line.rstrip().endswith(':'):
if isinstance(tree[path], (str, unicode)):
path.pop(-1)
tree[path].append(Tree())
path[-1] += 1
assert_indent = True
# XXX
return tree
def __repr__(self):
return 'Tree(%s)' % list.__repr__(self)
__str__ = __repr__
def copy(self):
''' recursive.
'''
res = []
for elem in self:
if isinstance(elem, list):
res.append(Tree(elem).copy())
else:
res.append(elem)
return res
def with(self, path, value):
''' functional programming sugar for __setitem__ which returns self.
'''
self[path] = value
return self
def __getitem__(self, path):
''' return the node of tree at path.
assumes path is a valid path. raises TypeError or IndexError if this is wrong.
>>> assert get_node([[['a', 'b'], 'c'], 'd'], [0, 0, 0]) == 'a'
'''
if not isinstance(path, (list, tuple)):
return list.__getitem__(self, path)
if len(path) == 0:
return self
elif len(path) == 1:
res = self[path[0]]
if isinstance(res, list) and not isinstance(res, Tree):
return Tree(res)
else:
return res
else:
return Tree(self[path[0]])[path[1:]]
def has_path(self, path):
''' determine whether path leads to a node.
>>> assert has_node([[['a', 'b'], 'c'], 'd'], [0, 0, 0])
'''
if path[0] > (len(self) - 1):
return False
elif len(path) > 1:
if not isinstance(self[path[0]], list):
return False
else:
return Tree(self[path[0]]).has_path(path[1:])
else:
# tree is at least as long as path[0] and len(path) == 1
return True
def contains(self, value):
''' determine whether value is anywhere in self.
i don't want to replace list.__iter__, because i want to be able to choose between depth-first and breadth-first.
'''
for elem in self:
if (elem == value) or (isinstance(elem, list) and Tree(elem).contains(value)):
return True
return False
def path_to(self, node):
''' do a depth-first search for node in self.
if node is not in tree, return None [which is an invalid path].
>>> assert get_path([[['a', 'b'], 'c'], 'd'], 'a') == [0, 0, 0]
'''
for i, elem in enumerate(self):
if (elem == node):
return [i]
elif isinstance(elem, list):
test_path = Tree(elem).path_to(node)
if test_path is not None:
return [i] + test_path
return None
def depth_first(self):
''' generate paths and nodes depth-first.
'''
for i, first in enumerate(self):
yield [i], first
if isinstance(first, list):
for j, second in Tree(first).depth_first():
yield ([i] + j), second
def breadth_first(self):
''' generate paths and nodes breadth_first.
'''
for i, elem in enumerate(self):
yield [i], elem
for i, first in enumerate(self):
if isinstance(first, list):
for j, second in Tree(first).breadth_first():
yield ([i] + j), second
def depth(self):
''' get the depth [greatest len(path)] for a tree L.
>>> assert depth([[['a', 'b'], 'c'], 'd']) == 3
'''
deepest = 0
for elem in self:
if isinstance(elem, (tuple, list)):
e_depth = Tree(elem).depth() + 1 # without the "+ 1", nothing gets any bigger!
if e_depth > deepest:
deepest = e_depth
elif deepest == 0:
deepest = 1
return deepest
def nested_getattr(obj, attr_names):
if len(attr_names) > 0:
return nested_getattr(getattr(obj, attr_names[0]), attr_names[1:])
else:
return obj
def nested_getitem(obj, item_names, default=None):
if len(item_names) > 0:
return nested_getitem(obj.get(item_names[0], default), item_names[1:], default)
else:
return obj
def nested_setattr(obj, attr_names, value):
''' create empty_namespaces as needed.
'''
if len(attr_names) > 1:
if not hasattr(obj, attr_names[0]):
setattr(obj, attr_names[0], empty_namespace())
nested_setattr(getattr(obj, attr_names[0]), attr_names[1:], value)
else:
setattr(obj, attr_names[0], value)
class UniqueIdentifier:
def __repr__(self):
return ''
__str__ = __repr__
class HBV(BitVector):
''' stores a huge bitvector in a file.
'''
def __init__(self, *args, **kwargs):
'''
'''
class Huffman:
''' encodes and decodes unicodes using HBV
'''
def __init__(self, *args, **kwargs):
'''
'''
class classpropertytype(property):
''' pythonic property syntax ala
http://www.z3lab.org/sections/blogs/philipp-weitershausen/2006_05_29_pycon-06-lightning-talk/tbping
usage:
class MyClass:
class prop_name(classproperty):
def __get__(self):return 1
def __set__(self, val):print val
'''
def __init__(self, name, bases=(), members={}):
return super(classpropertytype, self).__init__(
members.get('__get__'),
members.get('__set__'),
members.get('__delete__'),
members.get('__doc__')
)
classproperty = classpropertytype('classproperty')
even = lambda n: (n % 2) == 0
odd = lambda n: (n % 2) == 1
def synchronized(f):
''' function decorator which ensures that f is running in only ONE thread at a time
'''
def newf(*a, **kw):
while newf.locked:
time.sleep(NAP_TIME)
newf.locked = True
f(*a, **kw)
newf.locked = False
newf.locked = False
return newf
def get_email(srvr=None, nym=None, passw=None):
''' login to server srvr using name nym and password passw,
[prompting if any of them is None or the empty string],
return a list of email.Message.Message instances.
let all exceptions percolate up.
to get common parts of messages:
msg['DATE']
msg['SUBJECT']
msg['FROM']
str(msg)
str(msg._payload) # isinstance(msg._payload, (str, unicode, email.Message.Message))
msg.get_content_type()
'''
if not (srvr and nym and passw): # if any of these is the empty string or None, prompt
srvr = raw_input('what is you pop3 server? ')
nym = raw_input('what is your login name? ')
passw = raw_input('what is your password? ')
p = poplib.POP3(srvr)
p.user(nym)
p.pass_(passw)
msg_list = [email.Parser.Parser().parsestr('\n'.join(p.retr(i+1)[1])) for i in xrange(len(p.list()[1]))]
p.quit()
return msg_list
class ChildDict(dict):
''' ensure that sub-dicts let their parent dicts know that they've been changed, so the parent dicts can, say, update a pickle file.
'''
def __setitem__(self, name, new_value, update=True):
''' tell parent that i was changed.
preserve dicts i contain by updating them instead of overwriting them.
'''
if update and (name in self) and isinstance(self[name], dict) and isinstance(new_value, dict):
self[name].update(new_value)
else:
dict.__setitem__(self, name, new_value)
if isinstance(self.parent, ChildDict):
self.parent.__setitem__(self.name, self, False)
else:
self.parent[self.name] = self
def __getitem__(self, name):
''' if i contain a dict, i'll transform it into a ChildDict, too. yay, recursion!
'''
r = dict.__getitem__(self, name)
if isinstance(r, dict) and not isinstance(r, ChildDict):
r = ChildDict(r)(self, name)
dict.__setitem__(self, name, r)
return r
def __call__(self, parent, name):
''' since i can't overload dict.__init__, you must call this function before you modify me.
return self, so you don't need to waste lines setting attributes.
'''
self.parent = parent
self.name = name
return self
def update(self, d):
'''
'''
for k, v in d.iteritems():
self[k] = v
class ParentDict(dict):
def __getitem__(self, name):
print name
return dict.__getitem__(self, name)
def __setitem__(self, name, value):
print name, value
dict.__setitem__(self, name, value)
family_dict = ParentDict()
family_dict['a'] = ChildDict({'b':{'c':'c'}})(family_dict, 'a')
def nest_dict(formd, prepend='', form_nestor='/'):
''' translate a flat dict [whose keys are broken by FORM_NESTOR] to a nested dict.
'''
d = {}
for key, value in formd.iteritems():
keys = key.split(form_nestor)
old_subd = new_subd = d
for key_part in keys[:-1]:
# walk along the key parts into the nested dict d, creating inner dicts as necessary
# if FORM_NESTOR not in key, new_subd is never changes from d
new_subd = old_subd.get(key_part, {})
old_subd[key_part] = new_subd
old_subd = new_subd
new_subd[keys[-1]] = value
return d
def flatten_dict(d, prepend='', form_nestor='/'):
''' translate a nested dict to a flat dict [whose keys are broken by form_nestor].
flatten_dict({'a':{'b':{'c':'c'}}}) == {'a/b/c':c}
'''
r = {}
for key, value in d.iteritems():
if isinstance(value, dict):
r.update(flatten_dict(value, prepend + key + form_nestor))
else:
r[prepend + key] = value
return r
def sum(L):
''' generalized from the builtin case for numbers.
'''
r = L[0]
for x in L[1:]:
r += x
return r
I = lambda x: x
K = lambda x, y: x
S = lambda x, y, z: x(z)(y(z))
Y = lambda X: (lambda procedure: X(lambda arg: procedure(procedure)(arg)))(lambda procedure: X(lambda arg: procedure(procedure)(arg)))
Y.__doc__ = '''# -*- scheme -*-
(define (Y X)
(
(lambda (procedure)
(X
(lambda (arg)
(
(procedure procedure)
arg
)
)
)
)
(lambda (procedure)
(X
(lambda (arg)
(
(procedure procedure)
arg
)
)
)
)
)
)
'''
fib = Y(lambda fib_: lambda x: eval(['fib_(x - 1) + fib_(x - 2)', '1'][x < 2], locals()))
def ceqr(name, __ceqr_secret_val__, assignment_operator='='):
''' emulates C's "=" [assignment] operator
by assigning val to [str] name in the calling namespace,
and returning val.
'''
assert isinstance(name, str)
assert isinstance(assignment_operator, str)
f = sys._getframe(1)
LcLs = f.f_locals
LcLs['__ceqr_secret_val__'] = __ceqr_secret_val__
exec ('%s %s __ceqr_secret_val__' % (name, assignment_operator)) in f.f_globals, LcLs
return __ceqr_secret_val__
def weighted_choice(*two_tuples):
''' >>> weighted_choice(
... (.5, most_common_object),
... (.25, another_object)
... ) # returns None with probability .25
>>> freq = [0 for i in range(10)]
>>> for i in xrange(10000):freq[weighted_choice(*zip([.1,.1,.1,.2,0,.1,.1,.1,.2,0],range(10)))]+=1
...
>>> freq
[978, 934, 1019, 1943, 0, 1062, 1041, 964, 2059, 0]
'''
x = random.random()
running_sum = 0
for probability, obj in two_tuples:
running_sum += probability
if running_sum > x:
return obj
def rand_word(minlength=5, maxlength=10, vowels='aeiouy', start_vowel_probability=.5):
consonants = ''.join([c for c in string.lowercase if c not in vowels])
if random.random() > start_vowel_probability:
vowels, consonants = consonants, vowels
word = ''.join([
random.choice(consonants) + random.choice(vowels)
for i in xrange(math.ceil(random.randrange(minlength, maxlength + 1) / 2.0))
])
for old, new in [
('g', 'gg'),
('d', 'nd'),
('k', 'nk'),
('q', 'qu'),
(random.choice(consonants), 'r' + random.choice(consonants)),
]:
if random.random() > .5:
word = word.replace(old, new, 1)
return word
ISO_DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
def anydatetime(s, in_US=True, format=None):
''' take a string supposedly representing a date and/or time in no particular format,
and return the date and time in ISO format or that specified.
completely case independent.
'yesterday', and 'tomorrow' are special days which can only be accompanied by times.
'today' is optional if you only specify a time.
all times in absolute mode must be specified by 'HH:MM:SS' or 'HH:MM'.
'ago' is a keyword which must be preceded by alternations of numbers and strings in
['days', 'day', 'months', 'month', 'years', 'year', 'weeks', 'week', 'hour', 'hours', 'minutes', 'minute', 'seconds', 'second']
eg. '1 month 2 weeks 1 day ago', '2 years 1 day ago 2:10:40', '20 minutes ago'
anydatetime('2000 January') == anydatetime('Jan 2000')
i tried to read your mind, so, pinch of salt, and all that.
if you do NOT want me to assume that the month comes before the day before the year, specify in_US=False.
that variable is only used if s does not contain a month spelled out.
'''
assert isinstance(s, (str, unicode)), "input s neither str, unicode"
s = re.sub('[^\w\s\:\+]+', ' ', s).lower() # strip out commas, hyphens, slashes, etc.
if s.endswith('ago') or s.startswith('now+') or s.endswith('from now') or s.endswith('fromnow'):
# relative time
tokens = s.split(' ')
while '' in tokens:
tokens.remove('')
now = time.time()
if s.endswith('ago'):
tokens = tokens[:-1]
direction = -1
elif s.endswith('from now'):
tokens = tokens[:-2]
direction = 1
elif s.endswith('fromnow'):
tokens = tokens[:-1]
direction = 1
elif s.startswith('now+'):
tokens = tokens[1:]
direction = 1
for n, level in chunkify(tokens, 2):
if level.endswith('s'):
level = level[:-1]
if n.isdigit() and (level in anydatetime.levels):
now += anydatetime.levels[level] * int(n) * direction
if format is None:
return time.strftime(ISO_DATETIME_FORMAT, time.localtime(now))
else:
return time.strftime(format, time.localtime(now))
elif s in ['now', '']:
if format is None:
return time.strftime(ISO_DATETIME_FORMAT)
else:
return time.strftime(format)
else:
# absolute time
date = {'Y':time.strftime('%Y'), 'm':time.strftime('%m'), 'd':time.strftime('%d'), 'H':'00', 'M':'00', 'S':'00'}
long_months = [None, 'january', 'february', 'march', 'april', 'may', 'june', 'july', 'august', 'september', 'october', 'november', 'december']
short_months = [None] + [m[:3] for m in long_months[1:]]
long_days = [None, 'monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
short_days = [None] + [d[:3] for d in long_days[1:]]
days_in_month = dict(zip(range(1, 13), [31, (28 + int(0 == (int(date['Y']) % 4))), 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]))
tokens = [token.lower() for token in re.split('[^\w\:]+', s)]
day = None
month = None
uneaten_tokens = []
for i, token in enumerate(tokens):
if token in long_months:
date['m'] = str(long_months.index(token)).zfill(2)
month = token
elif token in short_months:
date['m'] = str(short_months.index(token)).zfill(2)
month = token
elif token in long_days:
day = token
date['d'] = str(long_days.index(token) + int(date['d']) - long_days.index(time.strftime('%A').lower())).zfill(2)
elif token in short_days:
day = token
date['d'] = str(short_days.index(token) + int(date['d']) - short_days.index(time.strftime('%a').lower())).zfill(2)
elif token == 'today':
pass
elif token == 'yesterday':
date['d'] = str(int(date['d']) - 1).zfill(2)
elif token == 'tomorrow':
date['d'] = str(int(date['d']) + 1).zfill(2)
elif (len(token) == 4) and token.isdigit() and (token[0] in '12'):
date['Y'] = token
elif re.match('(\d{1,2}\:){1,2}\d{1,2}', token):
token_parts = token.split(':')
date['H'] = token_parts[0].zfill(2)
date['M'] = token_parts[1].zfill(2)
if len(token_parts) > 2:
date['S'] = token_parts[2].zfill(2)
elif token.isdigit() and (len(token) < 3):
uneaten_tokens.append(token)
# look for day, month in case specified numerically; they're probably next to each other.
if len(uneaten_tokens) == 1:
token = uneaten_tokens[0]
if (month is None) and (1 <= int(token) <= 12):
date['m'] = token
elif (day is None) and (1 <= int(token) <= 31):
date['d'] = token
elif (len(uneaten_tokens) >= 2) and (month is None) and (day is None):
if len(uneaten_tokens) >= 3:
# eat the 3rd uneaten token as a 2-digit year between 2000 and 2099
date['Y'] = '20' + uneaten_tokens.pop(2)
if 12 < int(uneaten_tokens[0]) < 32:
date['d'] = uneaten_tokens[0]
date['m'] = uneaten_tokens[1]
elif 12 < int(uneaten_tokens[1]) < 32:
date['m'] = uneaten_tokens[0]
date['d'] = uneaten_tokens[1]
elif in_US:
date['m'] = uneaten_tokens[0]
date['d'] = uneaten_tokens[1]
else:
date['d'] = uneaten_tokens[0]
date['m'] = uneaten_tokens[1]
if (int(date['d']) < 0):
# day was last month... recursion!
date['m'], date['d'] = re.split('[\-\s]', anydatetime('%d days %d months ago' % (
(abs(int(date['d'])) + int(time.strftime('%d'))),
(int(time.strftime('%m')) - int(date['m'])),
)))[1:3]
elif (int(date['d']) > days_in_month[int(date['m'])]):
# day is in next month... recursion!
date['m'], date['d'] = re.split('[\-\s]', anydatetime('now+ %d days %d months' % (
(int(date['d']) - int(time.strftime('%d'))),
(int(date['m']) - int(time.strftime('%m'))),
)))[1:3]
result = ISO_DATETIME_FORMAT
for k, v in date.iteritems():
result = result.replace('%' + k, v)
if format is not None:
result = time.strftime(format, time.strptime(result, ISO_DATETIME_FORMAT))
return result
anydatetime.levels = {
'second':1, 'sec':1,
'min':60, 'minute':60,
'hour':3600, 'hr':3600,
'day':86400, 'd':86400,
'week':604800, 'wk':604800,
'month':2592000, 'mon':2592000,
'year':31556926, 'yr':31556926,
}
def debug(x):
print x
return x
def restart(delay=2, script_name=__file__, exitcode=1):
os.system('python -c "import time, os; time.sleep(%(delay)f); os.system(\'python %(script_name)s &\')" &' % locals())
sys.exit(exitcode)
nth_bit = lambda s, n, msb=True: (ord(s[[0, -1][msb] + ([1, -1][msb] * (n // 8))]) & (1 << (n % 8))) >> (n % 8)
def nth_bit(s, n, msb=True):
''' nth_bit('\x7f', 7) == nth_bit('\xfe', 7, False) == 0
nth_bit('\1', 7, False) == nth_bit('\x80', 7) == 1
'''
if isinstance(s, str):
byte_n, bit_n = divmod(n, 8)
if msb:
byte_n = -1 - byte_n
else:
bit_n = 7 - bit_n
return nth_bit(ord(s[byte_n]), bit_n)
elif isinstance(s, (int, long)):
return (s & (1 << n)) >> n
def with_nth_bit(s, n, b, msb=True):
''' with_nth_bit('\xff', 7, 0) == '\x7f'
with_nth_bit('\xff', 7, 0, False) == '\xfe'
with_nth_bit(255, 3, 0) == with_nth_bit(255, 3, 0, False) == 247
'''
if isinstance(s, str):
byte_n, bit_n = divmod(n, 8)
if msb:
byte_n = -byte_n
else:
bit_n = 7 - bit_n
return s[:byte_n] + chr(with_nth_bit(ord(s[byte_n]), bit_n, b)) + s[(byte_n + 1):]
elif isinstance(s, (int, long)):
if b:
return s | (1 << n)
elif ((s & (1 << n)) >> n):
return s - (1 << n)
else:
return s
def sci(n, base=10, format='(%(mantissa)s * (%(base)s ** %(exponent)s))'):
''' return a string representing n in pythonic scientific notation base base.
'''
exponent = int(math.log(abs(n), base))
mantissa = float(n) / (base ** exponent)
return format % {
'mantissa' : mantissa,
'base' : base,
'exponent' : exponent,
}
clc = execc("os.system('clear')", ret='\b\b')
clcl = clclc = cllcl = ccllc = cllc = ccll = execc("os.system('clear')", "os.system('ls -Al')", ret='\b\b')
x = execc('sys.exit()', ret='\b\b')
lsmod = execc("h('modules')", ret='\b\b')
cmd = lambda s: execc("os.system('%s')" % s)
cmda = lambda s: lambda a: cmd(s + ' ' + a)
is_port_open = lambda server, port, protocol='tcp': ('' == os.popen4('exec 5<>/dev/%s/%s/%s' % (protocol, server, port))[1].read())
get_open_ports = lambda server, protocol='tcp': [port for port in xrange(1<<16) if is_port_open(server, port, protocol)]
def is_divisible_by(n, factor):
return 0 == (n % factor)
def is_prime(p):
if (p <= 1) or not isinstance(p, (int, long)):
return False
if is_divisble_by(p, 2):
return False
i = 3
bound = long(math.sqrt(p)) + 1
while i < bound:
if is_divisble_by(p, i):
return False
i += 2
return True
class Solver(object):
'''takes a function, named arg value (opt.) and returns a Solver object'''
def __init__(self,f,**args):
self._f=f
self._args={}
# see important note on order of operations in __setattr__ below.
for arg in f.func_code.co_varnames[0:f.func_code.co_argcount]:
self._args[arg]=None
self._setargs(**args)
def __repr__(self):
argstring=','.join(['%s=%s' % (arg,str(value)) for (arg,value) in
self._args.items()])
if argstring:
return 'Solver(%s,%s)' % (self._f.func_code.co_name, argstring)
else:
return 'Solver(%s)' % self._f.func_code.co_name
def __getattr__(self,name):
'''used to extract function argument values'''
self._args[name]
return self._solve_for(name)
def __setattr__(self,name,value):
'''sets function argument values'''
# Note - once self._args is created, no new attributes can
# be added to self.__dict__. This is a good thing as it throws
# an exception if you try to assign to an arg which is inappropriate
# for the function in the solver.
if self.__dict__.has_key('_args'):
if name in self._args:
self._args[name]=value
else:
raise KeyError, name
else:
object.__setattr__(self,name,value)
def _setargs(self,**args):
'''sets values of function arguments'''
for arg in args:
self._args[arg] # raise exception if arg not in _args
setattr(self,arg,args[arg])
def _solve_for(self,arg):
'''Newton's method solver'''
TOL=0.0000001 # tolerance
ITERLIMIT=1000 # iteration limit
CLOSE_RUNS=10 # after getting close, do more passes
args=self._args
if self._args[arg]:
x0=self._args[arg]
else:
x0=1
if x0==0:
x1=1
else:
x1=x0*1.1
def f(x):
'''function to solve'''
args[arg]=x
return self._f(**args)
fx0=f(x0)
n=0
while 1: # Newton's method loop here
fx1 = f(x1)
if fx1==0 or x1==x0: # managed to nail it exactly
break
if abs(fx1-fx0)ITERLIMIT:
print "Failed to converge; exceeded iteration limit"
break
slope=(fx1-fx0)/(x1-x0)
if slope==0:
if close_flag: # we're close but have zero slope, finish
break
else:
print 'Zero slope and not close enough to solution'
break
x2=x0-fx0/slope # New 'x1'
fx0 = fx1
x0=x1
x1=x2
n+=1
self._args[arg]=x1
return x1
def tvm(pv,fv,pmt,n,i):
'''equation for time value of money'''
i=i/100
tmp=(1+i)**n
return pv*tmp+pmt/i*(tmp-1)-fv
def bin(data, n_bins):
min_data = min(data)
max_data = max(data)
bounds = frange(min_data, max_data, ((max_data - min_data) / n_bins))
return [sum([bounds[i] <= x < bounds[i + 1] for x in data]) for i in range(len(bounds) - 1)]
repr(clc)