# # wpylib.math.fitting.stochastic module # Created: 20150528 # Wirawan Purwanto # # Dependencies: # - numpy # - scipy # - matplotlib (for visualization routines) # """ wpylib.math.fitting.stochastic module Tools for stochastic curve fitting. """ import numpy import numpy.random from wpylib.math.fitting import fit_func_base from wpylib.math.stats.errorbar import errorbar class StochasticFitting(object): """Standard stochastic fit procedure. Class attributes: * `func`: function ansatz to be fitted. Set via init_func() method. This `func` needs to be a descendant of the fit_func_base object, or have an identical API, which are: - method `fit` - method `__call__` (i.e. a callable object) """ debug = 0 dbg_guess_params = True def_opt_report_final_params = 3 def __init__(self): self.use_nlf_guess = 1 self.use_dy_weights = True def init_func(self, func): self.func = func def init_samples(self, x, y, dy): """ Initializes the sample data against which we will perform the stochastic fitting. This function takes N measurement samples: - the (multidimensional) domain points, x - the measured target points, y - the uncertainty of the target points, dy """ # fix (or, actually, provide an accomodation for) a common "mistake" # for 1-D domain: make it standard by adding the "first" dimension x = fit_func_base.domain_array(x) self.samples_x = x self.samples_y = numpy.array(y) self.samples_dy = numpy.array(dy) self.samples_wt = (self.samples_dy)**(-2) if not (len(x[0]) == len(y) == len(dy)): raise TypeError, "Length of x, y, dy arrays are not identical." def init_rng(self, seed=None, rng_class=numpy.random.RandomState): """Initializes a standard random number generator for use in the fitting routine.""" if seed == None: seed = numpy.random.randint(numpy.iinfo(int).max) print "Using random seed: ", seed self.rng_seed = seed self.rng = rng_class(seed) def num_fit_params(self): """An ad-hoc way to determine the number of fitting parameters. FIXME: There is still not an a priori way to find the number of fit parameters in the fit_func_base class or its derivatives. There are a few after-the-fact ways to determine this: 1) Once the "deterministic" nonlinear fit is done, you can find the number of parameters by len(self.log_nlf_params) 2) Once the stochastic fit is done, you can also find the number of fit parameters by len(self.log_mc_params[0]) """ try: return len(self.log_nlf_params) except: pass try: return len(self.log_mc_params[0]) except: pass raise RuntimeError, "Cannot determine the number of fit parameters." def nlfit1(self): """Performs the non-stochastic, standard nonlinear fit. The output is given in `nlf_rec` attribute.""" from numpy.linalg import norm if self.use_dy_weights: dy = self.samples_dy else: dy = None rslt = self.func.fit(self.samples_x, self.samples_y, dy=dy) self.log_nlf_params = rslt self.nlf_f = self.func(self.log_nlf_params, self.samples_x) last_fit = self.func.last_fit mval_resid = self.nlf_f - self.samples_y self.nlf_ussr = norm(mval_resid)**2 self.nlf_wssr = norm(mval_resid / self.samples_dy)**2 self.nlf_funcalls = last_fit['funcalls'] self.nlf_rec = last_fit def mcfit_step1_toss_dice_(self): """Generates a single Monte Carlo dataset for the mcfit_step1_ procedure.""" self.dice_dy = self.rng.normal(size=len(self.samples_dy)) self.dice_y = self.samples_y + self.samples_dy * self.dice_dy def mcfit_step1_(self): """Performs a single Monte Carlo data fit.""" # Var name conventions: # - dice_* = values related to one "dice toss" of the sample # - mval_* = values related to the mean value of the samples # (i.e. samples_y) # FIXME: In future this *could* be run in parallel but the state vars # (such as dice_y, dice_dy, etc.) must be stored on per-thread basis. from numpy.linalg import norm self.mcfit_step1_toss_dice_() if self.use_dy_weights: dy = self.samples_dy else: dy = None rslt = self.func.fit(self.samples_x, self.dice_y, dy=dy, Guess=self.dice_param_guess, ) # fit result of the stochastic data self.dice_params = rslt self.log_mc_params.append(rslt) self.dice_f = self.func(self.dice_params, self.samples_x) if self.dbg_guess_params: self.log_guess_params.append(self.func.guess_params) last_fit = self.func.last_fit dice_resid = self.dice_f - self.dice_y mval_resid = self.dice_f - self.samples_y dice_ussr = norm(dice_resid)**2 dice_wssr = norm(dice_resid / self.samples_dy)**2 mval_ussr = norm(mval_resid)**2 mval_wssr = norm(mval_resid / self.samples_dy)**2 self.log_mc_stats.append((dice_ussr, dice_wssr, mval_ussr, mval_wssr)) self.log_mc_funcalls.append(last_fit['funcalls']) def mcfit_step1_viz_(self, save=True): """Generates a visual representation of the last MC fit step. """ from matplotlib import pyplot if not hasattr(self, "fig"): self.fig = pyplot.figure() self.fig.clf() ax = self.fig.add_subplot(1, 1, 1) title = "MC fit step %d" % self.mcfit_iter_num ax.set_title(title) x,y,dy = self.samples_x[0], self.samples_y, self.samples_dy ax.errorbar(x=x, y=y, yerr=dy, fmt="x", color="SlateGray", label="QMC", ) samples_xmin = x.min() samples_xmax = x.max() samples_xrange = samples_xmax - samples_xmin samples_ymin = y.min() samples_ymax = y.max() samples_yrange = samples_ymax - samples_ymin len_plot_x = 10*len(y) plot_x = numpy.linspace(start=samples_xmin - 0.03 * samples_xrange, stop=samples_xmax + 0.03 * samples_xrange, num=len_plot_x, endpoint=True) ax.plot(plot_x, self.func(self.nlf_rec.xopt, [plot_x]), "-", color="SlateGray", label="nlfit") ax.errorbar(x=x, y=self.dice_y, yerr=dy, fmt="or", label="MC toss", ) ax.plot(plot_x, self.func(self.dice_params, [plot_x]), "-", color="salmon", label="MC fit") samples_dy_max = numpy.max(self.samples_dy) ax.set_ylim((samples_ymin - samples_dy_max * 8, samples_ymax + samples_dy_max * 8)) if save: self.fig.savefig("mcfit-%04d.png" % self.mcfit_iter_num) def mcfit_loop_begin_(self): """Performs final initialization before firing up mcfit_loop_. This need to be done only before the first mcfit_loop_() call; if more samples are collected later, then this routine should NOT be called again or else all the accumulators would reset.""" self.log_guess_params = [] self.log_mc_params = [] self.log_mc_stats = [] self.log_mc_funcalls = [] if self.use_nlf_guess: print "Using guess param from NLF: ", self.nlfit1() self.dice_param_guess = self.log_nlf_params #print "- Params = ", self.log_nlf_params print self.log_nlf_params else: self.dice_param_guess = None def mcfit_loop_end_(self): """Performs final initialization before firing up do_mc_fitting: - Repackage log_mc_stats and log_mc_params as numpy array of structs """ # Number of fit parameters: num_params = len(self.log_mc_params[0]) #if True: try: pnames = self.func.param_names assert len(pnames) == num_params # Otherwise it will be faulty if self.func.use_lmfit_method: #from lmfit import Parameter ptype = float else: ptype = float param_dtype = [ (i, ptype) for i in pnames ] except: param_dtype = [ ("C"+str(i), float) for i in xrange(num_params) ] stats_dtype = [ (i, float) for i in ('dice_ussr', 'dice_wssr', 'mval_ussr', 'mval_wssr') ] # Can't initialize the self.mc_params array in a single step with # numpy.array construction function; we must copy the records one by one. # The reason is this: each element of the log_mc_params list is already # a numpy ndarray object. self.mc_params = numpy.empty((len(self.log_mc_params),), dtype=param_dtype) for (i,p) in enumerate(self.log_mc_params): if self.func.use_lmfit_method: self.mc_params[i] = tuple(p) else: self.mc_params[i] = p self.mc_stats = numpy.array(self.log_mc_stats, dtype=stats_dtype) self.fit_parameters = [ p[0] for p in param_dtype ] def mcfit_analysis_(self): """Performs analysis of the Monte Carlo fitting. This version does no weighting or filtering based on some cutoff criteria. """ flds = self.fit_parameters # == self.mc_params.dtype.names rslt = {} for F in flds: mean = numpy.average(self.mc_params[F]) err = numpy.std(self.mc_params[F]) rslt[F] = errorbar(mean, err) self.final_mc_params = rslt def mcfit_loop1_(self, num_iter, save_fig=0): """Performs the Monte-Carlo fit simulation after the input parameters are set up.""" for i in xrange(num_iter): self.mcfit_iter_num = i if self.debug >= 2: print "mcfit_loop1_: iteration %d" % i self.mcfit_step1_() if save_fig: self.mcfit_step1_viz_(save=True) def mcfit_report_final_params(self, format=None): if format == None: format = getattr(self, "opt_report_final_params", self.def_opt_report_final_params) if format in (None, False, 0): return # quiet! parm = self.final_mc_params if format == 3: print "Final parameters :", print " ".join([ "%s" % (parm[k],) for k in self.fit_parameters ]) elif format == 2: print "Final parameters:" print "\n".join([ " %s = %s" % (k, parm[k]) for k in self.fit_parameters ]) elif format == 1: print "Final parameters:" print parm def mcfit_run1(self, x=None, y=None, dy=None, data=None, func=None, rng_params=None, num_iter=100, save_fig=False): """The main routine to perform stochastic fit.""" if data != None: raise NotImplementedError elif dy != None: # Assume OK pass elif y != None and dy == None: y_orig = y y = errorbar_mean(y_orig) dy = errorbar_err(y_orig) else: raise TypeError, "Invalid argument combination for the input data." if func != None: self.init_func(func) if not hasattr(self, "func"): raise RuntimeError, \ "The fit function in the fitting object is undefined." self.init_samples(x=x, y=y, dy=dy, ) if rng_params != None: self.init_rng(**rng_params) elif not hasattr(self, "rng"): self.init_rng() self.mcfit_loop_begin_() self.mcfit_loop1_(num_iter=num_iter, save_fig=save_fig) self.mcfit_loop_end_() self.mcfit_analysis_() self.mcfit_report_final_params() return self.final_mc_params # The two routines below gives convenient way to evaluate the # fitted curve at arbitrary x values (good so long as they are not # far out from the range given by self.samples_x) def mcfit_eval_raw(self, x=None, yscale=1.0): """Evaluates the curve (y) values for a given set of x value(s). This routine generates the raw values based on the stochastically sampled parameter values.""" if x == None: x = self.samples_x else: x = fit_func_base.domain_array(x) xlen = len(x[0]) mc_curve_y = numpy.empty((len(self.mc_params), xlen)) # The following loop could have been written as a batch operation, # but it requires some nontrivial change in the convention of how # fit_func_base.__call__() is written. # Double broadcasting and other dimensional retrofitting/reduction # ('dot product'?) may be required. # Example: in harm_fit_func class, the statement # xdisp = (x[0] - C[2]) # will have to be changed becasuse the length of x[0] # (which is the number of data points in the "x" argument) # and the length of C[2] # (which is the number of MC iterations) # will not match--and these numbers must NOT be subtracted elementwise! for (i,ppp) in enumerate(self.mc_params): mc_curve_y[i] = self.func(ppp, x) return mc_curve_y def mcfit_eval(self, x=None, yscale=1.0, ddof=1, outfmt=0): """Evaluates the curve (y) values for a given set of x value(s). This routine generates the finalized values (with errorbar estimate) based on the stochastically sampled parameter values.""" # WARNING: CONVENTION CHANGES FROM ORIGINAL make_curve_errorbar() ROUTINE! # The default delta degree of freedom (ddof) should be 1 because we need # to take one out for the average itself. # If you need to reproduce old result, can revert to ddof=0. if x == None: x = self.samples_x else: x = fit_func_base.domain_array(x) mc_curve_y = self.mcfit_eval_raw(x=x) xlen = len(x[0]) final_mc_curve = numpy.empty((xlen,), dtype=[('val',float),('err',float)]) final_mc_curve['val'] = numpy.average(mc_curve_y, axis=0) final_mc_curve['err'] = numpy.std(mc_curve_y, axis=0, ddof=ddof) if yscale != 1.0: final_mc_curve['val'] *= yscale final_mc_curve['err'] *= yscale if outfmt == 0: pass # already in that format elif outfmt == 1: # Formatted as an array of "errorbar" objects final_mc_curve = numpy.array([errorbar(y,dy) for (y,dy) in final_mc_curve], dtype=errorbar) else: raise ValueError, "Unsupported outfmt value=%s." % (outfmt,) return final_mc_curve def mcfit_dump_param_samples(self, out): """Dump the generated parameter samples for diagnostic purposes. """ O = text_output(out) pnames = self.mc_params.dtype.names snames = self.mc_stats.dtype.names O.write("# %s ; %s ; nfev\n" % (" ".join(pnames), " ".join(snames))) O.write(matrix_str(array_hstack([ self.mc_params[k] for k in pnames ] + \ [ self.mc_stats[k] for k in snames ] + \ [ self.log_mc_funcalls]), " %#17.10g")+ "\n") def plot_curve_errorbar(sfit, fig=None, fig_axis=0, len_plot_x=None, colors=('0.80', 'red', 'green'), keys=('Stochastic fit', 'Experiment', 'Deterministic'), title='Comparing fits: stochastic vs deterministic'): """Plots the curve with errorbar & final MC fit of the curve. The input object is the MC fitting output (a StochasticFitting object). This routine draws the stochastic fit result (shade, midpoint) together with the raw y +/- dy data (if available) and the deterministic nonlinear fit. The `colors` is a 3-tuple argument controlling the color and possibly other attributes of each plot. Each tuple member can be a valid color argument (original way), or a dict (more control). Set this to None if you want to turn off a plot.""" from matplotlib import pyplot if fig == None: fig = pyplot.figure() fig.clf() ax = fig.add_subplot(1, 1, 1) ax.set_title(title) else: ax = fig.axes[fig_axis] samples_x = sfit.samples_x[0] samples_xmin = samples_x.min() samples_xmax = samples_x.max() samples_xrange = samples_xmax - samples_xmin if len_plot_x == None: len_plot_x = 4*len(samples_x) plot_x = numpy.linspace(start=samples_xmin - 0.03 * samples_xrange, stop=samples_xmax + 0.03 * samples_xrange, num=len_plot_x, endpoint=True).reshape((1,len_plot_x)) #make_curve_errorbar(sfit, plot_x) # -> use mcfit_eval instead: final_mc_curve = sfit.mcfit_eval(x=plot_x) mc_y = final_mc_curve['val'] mc_dy = final_mc_curve['err'] # save info on axis class (!!!) ax._plot_mc_curve_x = plot_x[0] ax._plot_mc_curve_y = final_mc_curve if False: print plot_x[0] print mc_y global dbg dbg = struct() dbg.x = plot_x[0] dbg.y = mc_y def colordict(c, **defaults): cdict = defaults.copy() if isinstance(c, dict): cdict.update(c) else: cdict.update(color=c) # old default--scalar is a color argument. return cdict # *1* Smooth plot (the stochastic fit result), errorbar shading: if colors[0] is not None: ax.fill_between(x=plot_x[0], y1=mc_y-mc_dy, y2=mc_y+mc_dy, **colordict(colors[0], alpha=0.55)) #print "colordict = ", colordict(colors[0], alpha=0.55) # *2* Smooth plot (the stochastic fit result), mid-points only (no connecting line): if colors[1] is not None: ax.plot(plot_x[0], mc_y, 'x', label=keys[0], **colordict(colors[1])) #ax.errorbar(x=plot_x[0], y=mc_y, yerr=mc_dy, fmt='+-', label='Stochastic fit') # *3* Raw data point: if hasattr(sfit, "samples_y") and hasattr(sfit, "samples_dy"): ax.errorbar(x=samples_x, y=sfit.samples_y, yerr=sfit.samples_dy, fmt="o", label=keys[1], **colordict(colors[1])) if colors[2] is not None: # *4* Smooth plot, line from deterministic fitting # This will significantly differ from *2* if the stochastic plot fails. nlf_y = sfit.func(sfit.log_nlf_params, plot_x) ax.plot(plot_x[0], nlf_y, label=keys[2], **colordict(colors[2])) ax.legend() return fig