# wpylib.math.fitting.stochastic module
# Created: 20150528
# Wirawan Purwanto
# Dependencies:
# - numpy
# - scipy
# - matplotlib (for visualization routines)
wpylib.math.fitting.stochastic module
Tools for stochastic curve fitting.
import numpy
import numpy.random
from wpylib.math.fitting import fit_func_base
from wpylib.math.stats.errorbar import errorbar
class StochasticFitting(object):
"""Standard stochastic fit procedure.
Class attributes:
* `func`: function ansatz to be fitted.
Set via init_func() method.
This `func` needs to be a descendant of the fit_func_base object,
or have an identical API, which are:
- method `fit`
- method `__call__` (i.e. a callable object)
debug = 0
dbg_guess_params = True
def_opt_report_final_params = 3
def __init__(self):
self.use_nlf_guess = 1
self.use_dy_weights = True
def init_func(self, func):
self.func = func
def init_samples(self, x, y, dy):
Initializes the sample data against which we will perform
the stochastic fitting.
This function takes N measurement samples:
- the (multidimensional) domain points, x
- the measured target points, y
- the uncertainty of the target points, dy
# fix (or, actually, provide an accomodation for) a common "mistake"
# for 1-D domain: make it standard by adding the "first" dimension
x = fit_func_base.domain_array(x)
self.samples_x = x
self.samples_y = numpy.array(y)
self.samples_dy = numpy.array(dy)
self.samples_wt = (self.samples_dy)**(-2)
if not (len(x[0]) == len(y) == len(dy)):
raise TypeError, "Length of x, y, dy arrays are not identical."
def init_rng(self, seed=None, rng_class=numpy.random.RandomState):
"""Initializes a standard random number generator for use in
the fitting routine."""
if seed == None:
seed = numpy.random.randint(numpy.iinfo(int).max)
print "Using random seed: ", seed
self.rng_seed = seed
self.rng = rng_class(seed)
def num_fit_params(self):
"""An ad-hoc way to determine the number of fitting parameters.
FIXME: There is still not an a priori way to find the number of
fit parameters in the fit_func_base class or its derivatives.
There are a few after-the-fact ways to determine this:
1) Once the "deterministic" nonlinear fit is done, you can find the
number of parameters by
2) Once the stochastic fit is done, you can also find the number
of fit parameters by
return len(self.log_nlf_params)
return len(self.log_mc_params[0])
raise RuntimeError, "Cannot determine the number of fit parameters."
def nlfit1(self):
"""Performs the non-stochastic, standard nonlinear fit.
The output is given in `nlf_rec` attribute."""
from numpy.linalg import norm
if self.use_dy_weights:
dy = self.samples_dy
dy = None
rslt =, self.samples_y, dy=dy)
self.log_nlf_params = rslt
self.nlf_f = self.func(self.log_nlf_params, self.samples_x)
last_fit = self.func.last_fit
mval_resid = self.nlf_f - self.samples_y
self.nlf_ussr = norm(mval_resid)**2
self.nlf_wssr = norm(mval_resid / self.samples_dy)**2
self.nlf_funcalls = last_fit['funcalls']
self.nlf_rec = last_fit
def mcfit_step1_toss_dice_(self):
"""Generates a single Monte Carlo dataset for the mcfit_step1_
self.dice_dy = self.rng.normal(size=len(self.samples_dy))
self.dice_y = self.samples_y + self.samples_dy * self.dice_dy
def mcfit_step1_(self):
"""Performs a single Monte Carlo data fit."""
# Var name conventions:
# - dice_* = values related to one "dice toss" of the sample
# - mval_* = values related to the mean value of the samples
# (i.e. samples_y)
# FIXME: In future this *could* be run in parallel but the state vars
# (such as dice_y, dice_dy, etc.) must be stored on per-thread basis.
from numpy.linalg import norm
if self.use_dy_weights:
dy = self.samples_dy
dy = None
rslt =, self.dice_y, dy=dy,
# fit result of the stochastic data
self.dice_params = rslt
self.dice_f = self.func(self.dice_params, self.samples_x)
if self.dbg_guess_params:
last_fit = self.func.last_fit
dice_resid = self.dice_f - self.dice_y
mval_resid = self.dice_f - self.samples_y
dice_ussr = norm(dice_resid)**2
dice_wssr = norm(dice_resid / self.samples_dy)**2
mval_ussr = norm(mval_resid)**2
mval_wssr = norm(mval_resid / self.samples_dy)**2
self.log_mc_stats.append((dice_ussr, dice_wssr, mval_ussr, mval_wssr))
def mcfit_step1_viz_(self, save=True):
"""Generates a visual representation of the last MC fit step.
from matplotlib import pyplot
if not hasattr(self, "fig"):
self.fig = pyplot.figure()
ax = self.fig.add_subplot(1, 1, 1)
title = "MC fit step %d" % self.mcfit_iter_num
x,y,dy = self.samples_x[0], self.samples_y, self.samples_dy
ax.errorbar(x=x, y=y, yerr=dy,
fmt="x", color="SlateGray", label="QMC",
samples_xmin = x.min()
samples_xmax = x.max()
samples_xrange = samples_xmax - samples_xmin
samples_ymin = y.min()
samples_ymax = y.max()
samples_yrange = samples_ymax - samples_ymin
len_plot_x = 10*len(y)
plot_x = numpy.linspace(start=samples_xmin - 0.03 * samples_xrange,
stop=samples_xmax + 0.03 * samples_xrange,
ax.plot(plot_x, self.func(self.nlf_rec.xopt, [plot_x]), "-",
color="SlateGray", label="nlfit")
ax.errorbar(x=x, y=self.dice_y, yerr=dy,
fmt="or", label="MC toss",
ax.plot(plot_x, self.func(self.dice_params, [plot_x]), "-",
color="salmon", label="MC fit")
samples_dy_max = numpy.max(self.samples_dy)
ax.set_ylim((samples_ymin - samples_dy_max * 8, samples_ymax + samples_dy_max * 8))
if save:
self.fig.savefig("mcfit-%04d.png" % self.mcfit_iter_num)
def mcfit_loop_begin_(self):
"""Performs final initialization before firing up mcfit_loop_.
This need to be done only before the first mcfit_loop_() call;
if more samples are collected later, then this routine should NOT be
called again or else all the accumulators would reset."""
self.log_guess_params = []
self.log_mc_params = []
self.log_mc_stats = []
self.log_mc_funcalls = []
if self.use_nlf_guess:
print "Using guess param from NLF: ",
self.dice_param_guess = self.log_nlf_params
#print "- Params = ", self.log_nlf_params
print self.log_nlf_params
self.dice_param_guess = None
def mcfit_loop_end_(self):
"""Performs final initialization before firing up do_mc_fitting:
- Repackage log_mc_stats and log_mc_params as numpy array of structs
# Number of fit parameters:
num_params = len(self.log_mc_params[0])
#if True:
pnames = self.func.param_names
assert len(pnames) == num_params # Otherwise it will be faulty
if self.func.use_lmfit_method:
#from lmfit import Parameter
ptype = float
ptype = float
param_dtype = [ (i, ptype) for i in pnames ]
param_dtype = [ ("C"+str(i), float) for i in xrange(num_params) ]
stats_dtype = [ (i, float) for i in ('dice_ussr', 'dice_wssr', 'mval_ussr', 'mval_wssr') ]
# Can't initialize the self.mc_params array in a single step with
# numpy.array construction function; we must copy the records one by one.
# The reason is this: each element of the log_mc_params list is already
# a numpy ndarray object.
self.mc_params = numpy.empty((len(self.log_mc_params),), dtype=param_dtype)
for (i,p) in enumerate(self.log_mc_params):
if self.func.use_lmfit_method:
self.mc_params[i] = tuple(p)
self.mc_params[i] = p
self.mc_stats = numpy.array(self.log_mc_stats, dtype=stats_dtype)
self.fit_parameters = [ p[0] for p in param_dtype ]
def mcfit_analysis_(self):
"""Performs analysis of the Monte Carlo fitting.
This version does no weighting or filtering based on some cutoff criteria.
flds = self.fit_parameters # == self.mc_params.dtype.names
rslt = {}
for F in flds:
mean = numpy.average(self.mc_params[F])
err = numpy.std(self.mc_params[F])
rslt[F] = errorbar(mean, err)
self.final_mc_params = rslt
def mcfit_loop1_(self, num_iter, save_fig=0):
"""Performs the Monte-Carlo fit simulation after the
input parameters are set up."""
for i in xrange(num_iter):
self.mcfit_iter_num = i
if self.debug >= 2:
print "mcfit_loop1_: iteration %d" % i
if save_fig:
def mcfit_report_final_params(self, format=None):
if format == None:
format = getattr(self, "opt_report_final_params", self.def_opt_report_final_params)
if format in (None, False, 0):
return # quiet!
parm = self.final_mc_params
if format == 3:
print "Final parameters :",
print " ".join([
"%s" % (parm[k],) for k in self.fit_parameters
elif format == 2:
print "Final parameters:"
print "\n".join([
" %s = %s" % (k, parm[k])
for k in self.fit_parameters
elif format == 1:
print "Final parameters:"
print parm
def mcfit_run1(self, x=None, y=None, dy=None, data=None, func=None, rng_params=None,
num_iter=100, save_fig=False):
"""The main routine to perform stochastic fit."""
if data != None:
raise NotImplementedError
elif dy != None:
# Assume OK
elif y != None and dy == None:
y_orig = y
y = errorbar_mean(y_orig)
dy = errorbar_err(y_orig)
raise TypeError, "Invalid argument combination for the input data."
if func != None:
if not hasattr(self, "func"):
raise RuntimeError, \
"The fit function in the fitting object is undefined."
if rng_params != None:
elif not hasattr(self, "rng"):
self.mcfit_loop1_(num_iter=num_iter, save_fig=save_fig)
return self.final_mc_params
# The two routines below gives convenient way to evaluate the
# fitted curve at arbitrary x values (good so long as they are not
# far out from the range given by self.samples_x)
def mcfit_eval_raw(self, x=None, yscale=1.0):
"""Evaluates the curve (y) values for a given set of x value(s).
This routine generates the raw values based on the stochastically
sampled parameter values."""
if x == None:
x = self.samples_x
x = fit_func_base.domain_array(x)
xlen = len(x[0])
mc_curve_y = numpy.empty((len(self.mc_params), xlen))
# The following loop could have been written as a batch operation,
# but it requires some nontrivial change in the convention of how
# fit_func_base.__call__() is written.
# Double broadcasting and other dimensional retrofitting/reduction
# ('dot product'?) may be required.
# Example: in harm_fit_func class, the statement
# xdisp = (x[0] - C[2])
# will have to be changed becasuse the length of x[0]
# (which is the number of data points in the "x" argument)
# and the length of C[2]
# (which is the number of MC iterations)
# will not match--and these numbers must NOT be subtracted elementwise!
for (i,ppp) in enumerate(self.mc_params):
mc_curve_y[i] = self.func(ppp, x)
return mc_curve_y
def mcfit_eval(self, x=None, yscale=1.0, ddof=1, outfmt=0):
"""Evaluates the curve (y) values for a given set of x value(s).
This routine generates the finalized values (with errorbar estimate)
based on the stochastically sampled parameter values."""
# The default delta degree of freedom (ddof) should be 1 because we need
# to take one out for the average itself.
# If you need to reproduce old result, can revert to ddof=0.
if x == None:
x = self.samples_x
x = fit_func_base.domain_array(x)
mc_curve_y = self.mcfit_eval_raw(x=x)
xlen = len(x[0])
final_mc_curve = numpy.empty((xlen,), dtype=[('val',float),('err',float)])
final_mc_curve['val'] = numpy.average(mc_curve_y, axis=0)
final_mc_curve['err'] = numpy.std(mc_curve_y, axis=0, ddof=ddof)
if yscale != 1.0:
final_mc_curve['val'] *= yscale
final_mc_curve['err'] *= yscale
if outfmt == 0:
pass # already in that format
elif outfmt == 1:
# Formatted as an array of "errorbar" objects
final_mc_curve = numpy.array([errorbar(y,dy) for (y,dy) in final_mc_curve], dtype=errorbar)
raise ValueError, "Unsupported outfmt value=%s." % (outfmt,)
return final_mc_curve
def mcfit_dump_param_samples(self, out):
"""Dump the generated parameter samples for diagnostic purposes.
O = text_output(out)
pnames = self.mc_params.dtype.names
snames = self.mc_stats.dtype.names
O.write("# %s ; %s ; nfev\n" % (" ".join(pnames), " ".join(snames)))
O.write(matrix_str(array_hstack([ self.mc_params[k] for k in pnames ] + \
[ self.mc_stats[k] for k in snames ] + \
[ self.log_mc_funcalls]),
" %#17.10g")+ "\n")
def plot_curve_errorbar(sfit, fig=None, fig_axis=0,
colors=('0.80', 'red', 'green'),
keys=('Stochastic fit', 'Experiment', 'Deterministic'),
title='Comparing fits: stochastic vs deterministic'):
"""Plots the curve with errorbar & final MC fit of the curve.
The input object is the MC fitting output (a StochasticFitting object).
This routine draws the stochastic fit result (shade, midpoint)
together with the raw y +/- dy data (if available) and
the deterministic nonlinear fit.
The `colors` is a 3-tuple argument controlling the color and
possibly other attributes of each plot.
Each tuple member can be a valid color argument (original way), or a dict
(more control).
Set this to None if you want to turn off a plot."""
from matplotlib import pyplot
if fig == None:
fig = pyplot.figure()
ax = fig.add_subplot(1, 1, 1)
ax = fig.axes[fig_axis]
samples_x = sfit.samples_x[0]
samples_xmin = samples_x.min()
samples_xmax = samples_x.max()
samples_xrange = samples_xmax - samples_xmin
if len_plot_x == None:
len_plot_x = 4*len(samples_x)
plot_x = numpy.linspace(start=samples_xmin - 0.03 * samples_xrange,
stop=samples_xmax + 0.03 * samples_xrange,
#make_curve_errorbar(sfit, plot_x)
# -> use mcfit_eval instead:
final_mc_curve = sfit.mcfit_eval(x=plot_x)
mc_y = final_mc_curve['val']
mc_dy = final_mc_curve['err']
# save info on axis class (!!!)
ax._plot_mc_curve_x = plot_x[0]
ax._plot_mc_curve_y = final_mc_curve
if False:
print plot_x[0]
print mc_y
global dbg
dbg = struct()
dbg.x = plot_x[0]
dbg.y = mc_y
def colordict(c, **defaults):
cdict = defaults.copy()
if isinstance(c, dict):
cdict.update(color=c) # old default--scalar is a color argument.
return cdict
# *1* Smooth plot (the stochastic fit result), errorbar shading:
if colors[0] is not None:
ax.fill_between(x=plot_x[0], y1=mc_y-mc_dy, y2=mc_y+mc_dy, **colordict(colors[0], alpha=0.55))
#print "colordict = ", colordict(colors[0], alpha=0.55)
# *2* Smooth plot (the stochastic fit result), mid-points only (no connecting line):
if colors[1] is not None:
ax.plot(plot_x[0], mc_y, 'x', label=keys[0], **colordict(colors[1]))
#ax.errorbar(x=plot_x[0], y=mc_y, yerr=mc_dy, fmt='+-', label='Stochastic fit')
# *3* Raw data point:
if hasattr(sfit, "samples_y") and hasattr(sfit, "samples_dy"):
ax.errorbar(x=samples_x, y=sfit.samples_y, yerr=sfit.samples_dy,
fmt="o", label=keys[1],
if colors[2] is not None:
# *4* Smooth plot, line from deterministic fitting
# This will significantly differ from *2* if the stochastic plot fails.
nlf_y = sfit.func(sfit.log_nlf_params, plot_x)
ax.plot(plot_x[0], nlf_y, label=keys[2],
return fig