The other reason for this post is to make a point about code robustness. Getting numerical derivatives right is a notoriously difficult process, and getting them both efficient and right is even harder. The package I have been using, MINUIT, has been used by the particle physicists for decades, and they have hammered out all sorts of corner cases that a naive implementation might miss. So I'm not going to use a naive implementation, not least because my problem is numerically difficult. This means the code is not simple, and I don't want to try to restructure it manually to be more concurrent; I want the computer to handle the concurrency for me.
MINUIT does its numerical optimization using a "quasi-Newton method". That is, it tries to approximate the function as a quadratic and jump straight to the minimum. The "quasi-" is because it builds the quadratic estimate by taking gradients at successive points and updating the Hessian matrix based on the measured gradients. This is a reasonably effective method, and it spends almost all its time evaluating numerical gradients.
The function I am using takes 20-30 input parameters and returns a goodness-of-fit value (actually a log-likelihood). So computing the gradient is actually a matter of computing derivatives independently in 20-30 directions. If this could be done in parallel, a substantial speedup would be possible. Importantly, the (highly non-trivial) MINUIT code is built to permit "analytic" gradient functions to be passed in, so this is a change that can be made without reimplementing the entire algorithm.
The numerical derivative algorithm is in principle simple: in dimension d, choose a step size h and compute (f(x+h)-f(x-h))/2h. The trick is choosing the right value for h - too large and the function doesn't look like a parabola, too small and round-off error wrecks the result. MINUIT uses several tricks to find the right h. First, if you also have f(x), you can estimate the second derivative, and use it to work out how much truncation error there is in the (worse) first-order estimate (f(x+h)-f(x))/h. This error is a pessimistic estimate for the truncation error in the symmetric calculation, and it can be used to balance truncation error and round-off. Of course, if your initial h was wildly wrong, your corrected value might not be okay either, so you might have to repeat this until it stabilizes. And obviously having a good initial guess matters, so you want to keep around estimates of the gradient and second derivative from previous function evaluations. Plus you have to be careful about floating-point misbehaviour. So the whole function is kind of complicated. But here it is, converted from MINUIT code to parallel, coroutine-using form:
#!/usr/bin/env python # -*- coding: utf-8 -*- import concurrent.futures import asyncio import time import numpy as np @asyncio.coroutine def pmap(f, xs, loop=None): fs = [asyncio.async(f(*x), loop=loop) for x in xs] yield from asyncio.wait(fs) return [f.result() for f in fs] class Scalar: def __init__(self, f, step_initial=1e-8, f_eps=None, max_gradient_steps=3, step_tolerance=0.3, gradient_tolerance=0.05, dtype=float, pool=None): self.f = f self.x = None self.f_x = None self.dtype = dtype try: self.step_initial = self.dtype(step_initial) except TypeError: self.step_initial = np.asarray(step_initial) if f_eps is None: self.f_eps = np.finfo(self.dtype).eps else: self.f_eps = self.dtype(f_eps) self.up = 1. self.eps = 8*np.finfo(self.dtype).eps self.eps2 = 2*np.sqrt(np.finfo(self.dtype).eps) self._gradient = None self._gradient_steps = None self._gradient_2nd = None self.max_gradient_steps = max_gradient_steps self.step_tolerance = step_tolerance self.gradient_tolerance = gradient_tolerance if pool is None: self.pool = concurrent.futures.ProcessPoolExecutor() else: self.pool = pool self.loop = asyncio.get_event_loop() def __call__(self, x): if self.x is None or not np.all(x==self.x): self.x = x self.f_x = self.loop.run_until_complete(self._f(x)) return self.f_x @asyncio.coroutine def _f(self, x): return (yield from self.loop.run_in_executor(self.pool, self.f, x)) def _setup_initial_gradient(self): # FIXME: make sure dirin doesn't get too small dirin = 2*self.step_initial if isinstance(dirin, self.dtype): dirin = np.array([dirin]*self.n) self._gradient_2nd = 2*self.up/dirin**2 self._gradient_steps = 0.1*dirin self._gradient = self._gradient_2nd*dirin def gradient(self, x): x = np.asarray(x) self(x) self.n = len(x) if self._gradient is None: self._setup_initial_gradient() if len(self._gradient_steps) != len(x): raise ValueError("Dimensionality of input appears to have changed " "from %d to %d" % (len(self._gradient_steps), len(x))) if False: return np.array([ self.loop.run_until_complete(self._gradient1d(d)) for d in range(len(x))]) else: return np.array(self.loop.run_until_complete( pmap(self._gradient1d,[(d,) for d in range(len(x))]))) @asyncio.coroutine def _gradient1d(self, d): dfmin = 8*self.eps2*(np.abs(self.f_x)+self.up) vrysml = 8*self.eps**2 xtf = self.x[d] epspri = self.eps2+np.abs(self._gradient[d]*self.eps2) stepb4 = 0 for j in range(self.max_gradient_steps): optstp = np.sqrt(dfmin/(np.abs(self._gradient_2nd[d])+epspri)) step = max(optstp, np.abs(0.1*self._gradient_steps[d])) step = min(step, 10*self._gradient_steps[d]) step = max(step, vrysml) step = max(step, 8*np.abs(self.eps2*self.x[d])) if np.abs((step-stepb4)/step) < self.step_tolerance: break self._gradient_steps[d] = step stepb4 = step x1 = self.x.copy() x1[d] = xtf+step x2 = self.x.copy() x2[d] = xtf-step fs1, fs2 = yield from pmap(self._f, [(x1,), (x2,)]) step = (x1[d]-x2[d])/2 # in case of round-off grdb4 = self._gradient[d] self._gradient[d] = 0.5*(fs1-fs2)/step self._gradient_2nd[d] = (fs1+fs2 - 2.*self.f_x)/step**2 if (np.abs(grdb4-self._gradient[d]) /(np.abs(self._gradient[d])+dfmin/step) < self.gradient_tolerance): break return self._gradient[d] if __name__=='__main__': k = np.array([1e-6,1e-3,1,1e3,1e6]) def f(x): return np.sin(np.dot(k,x)) def df(x): return np.cos(np.dot(k,x))*k F = Scalar(f) for i in range(20): print() x = np.random.randn(len(k)) print(F.gradient(x)) print(df(x))Unfortunately, coroutines require python 3.4, while the rest of the system uses python 2. So I did a conversion to a thread-based approach, which was gratifyingly simple:
#!/usr/bin/env python # -*- coding: utf-8 -*- import concurrent.futures import time import numpy as np def pmap(f, xs, pool): ts = [pool.submit(f,*x) for x in xs] return [t.result() for t in ts] class Scalar: def __init__(self, f, step_initial=1e-8, f_eps=None, max_gradient_steps=3, step_tolerance=0.3, gradient_tolerance=0.05, dtype=float, pool=None): self.f = f self.x = None self.f_x = None self.dtype = dtype try: self.step_initial = self.dtype(step_initial) except TypeError: self.step_initial = np.asarray(step_initial) if f_eps is None: self.f_eps = np.finfo(self.dtype).eps else: self.f_eps = self.dtype(f_eps) self.up = 1. self.eps = 8*np.finfo(self.dtype).eps self.eps2 = 2*np.sqrt(np.finfo(self.dtype).eps) self._gradient = None self._gradient_steps = None self._gradient_2nd = None self.max_gradient_steps = max_gradient_steps self.step_tolerance = step_tolerance self.gradient_tolerance = gradient_tolerance if pool is None: self.pool = concurrent.futures.ProcessPoolExecutor() else: self.pool = pool self.thread_pool = concurrent.futures.ThreadPoolExecutor(65536) def __call__(self, x): if self.x is None or not np.all(x==self.x): self.x = x self.f_x = self.pool.submit(self.f,x).result() return self.f_x def _setup_initial_gradient(self): # FIXME: make sure dirin doesn't get too small dirin = 2*self.step_initial if isinstance(dirin, self.dtype): dirin = np.array([dirin]*self.n) self._gradient_2nd = 2*self.up/dirin**2 self._gradient_steps = 0.1*dirin self._gradient = self._gradient_2nd*dirin def gradient(self, x): x = np.asarray(x) self(x) self.n = len(x) if self._gradient is None: self._setup_initial_gradient() if len(self._gradient_steps) != len(x): raise ValueError("Dimensionality of input appears to have changed " "from %d to %d" % (len(self._gradient_steps), len(x))) return np.array(pmap(self._gradient1d,[(d,) for d in range(len(x))], self.thread_pool)) def _gradient1d(self, d): dfmin = 8*self.eps2*(np.abs(self.f_x)+self.up) vrysml = 8*self.eps**2 xtf = self.x[d] epspri = self.eps2+np.abs(self._gradient[d]*self.eps2) stepb4 = 0 for j in range(self.max_gradient_steps): optstp = np.sqrt(dfmin/(np.abs(self._gradient_2nd[d])+epspri)) step = max(optstp, np.abs(0.1*self._gradient_steps[d])) step = min(step, 10*self._gradient_steps[d]) step = max(step, vrysml) step = max(step, 8*np.abs(self.eps2*self.x[d])) if np.abs((step-stepb4)/step) < self.step_tolerance: break self._gradient_steps[d] = step stepb4 = step x1 = self.x.copy() x1[d] = xtf+step x2 = self.x.copy() x2[d] = xtf-step fs1, fs2 = pmap(self, [(x1,), (x2,)], self.thread_pool) step = (x1[d]-x2[d])/2 # in case of round-off grdb4 = self._gradient[d] self._gradient[d] = 0.5*(fs1-fs2)/step self._gradient_2nd[d] = (fs1+fs2 - 2.*self.f_x)/step**2 if (np.abs(grdb4-self._gradient[d]) /(np.abs(self._gradient[d])+dfmin/step) < self.gradient_tolerance): break return self._gradient[d] if __name__=='__main__': k = np.array([1e-6,1e-3,1,1e3,1e6]) def f(x): return np.sin(np.dot(k,x)) def df(x): return np.cos(np.dot(k,x))*k F = Scalar(f) for i in range(20): print() x = np.random.randn(len(k)) print(F.gradient(x)) print(df(x))Applying this to the actual triple system problem is a work in progress. There's also the very promising pure-python nested-sampling package nestle, which I would like to convert to use futures to do parallel computation, again because I want to apply it to the triple system. That's a longer-term problem, because my objective function currently crashes if you give it sufficiently-bogus orbital parameters.
2 comments:
I'm curious -- is there a reason you wrote your own pmap rather than using the .map method from the PoolExecutor objects?
Good point. I have my own pmap because asyncio does not provide one(?), and I wrote the coroutine version first. But of course one could simplify the threaded version by using pool.map.
Post a Comment