Module dadi.DFE.Cache1D_mod
Initially developed by Bernard Kim and published as fitdadi Kim, Huber, Lohmueller (2017) Genetics. Modified version of the script found at: https://doi.org/10.1534/genetics.116.197145 . Based on scripts from: https://groups.google.com/forum/#!topic/dadi-user/4xspqlITcvc .
Expand source code
"""
Initially developed by Bernard Kim and published as fitdadi
Kim, Huber, Lohmueller (2017) Genetics.
Modified version of the script found at: https://doi.org/10.1534/genetics.116.197145 .
Based on scripts from:
https://groups.google.com/forum/#!topic/dadi-user/4xspqlITcvc .
"""
import operator
import sys, traceback
import numpy as np
import scipy.stats.distributions
import scipy.integrate
import dadi
from dadi import Numerics, Spectrum
class Cache1D:
def __init__(self, params, ns, demo_sel_func, pts,
gamma_bounds=(1e-4, 2000), gamma_pts=500,
additional_gammas=[],
cpus=None, gpus=0, verbose=False):
"""
params: Optimized demographic parameters
ns: Sample size(s) for cached spectra
demo_sel_func: DaDi demographic function with selection.
gamma must be the last argument.
pts: Integration/extrapolation grid points settings for demo_sel_func
gamma_bounds: Range of gammas to cache spectra for.
gamma_pts: Number of gamma grid points over which to integrate
additional_gammas: Additional positive values of gamma to cache for
cpus: For multiprocessing, number of CPU jobs to launch.
If None (default), then all CPUs available will be used.
gpus: For multiprocessing, number of GPU jobs to launch.
verbose: If True, print messages to track progress of cache generation.
"""
self.params, self.ns, self.pts = tuple(params), tuple(ns), tuple(pts)
#Create a vector of gammas that are log-spaced over an interval
self.gammas = -np.logspace(np.log10(gamma_bounds[1]),
np.log10(gamma_bounds[0]), gamma_pts)
# Record negative gammas, for later use
self.neg_gammas = self.gammas
# Add additional gammas to cache
self.gammas = np.concatenate((self.gammas, additional_gammas))
self.spectra = [None]*len(self.gammas)
self.params = params
self.ns = ns
self.pts = pts
self.func_name = demo_sel_func.__name__
if cpus is None:
import multiprocessing
cpus = multiprocessing.cpu_count()
if not cpus > 1 or gpus > 0: #for running with a single thread
self._single_process(verbose, demo_sel_func)
else: #for running with with multiple cores
self._multiple_processes(cpus, gpus, verbose, demo_sel_func)
demo_sel_extrap_func = Numerics.make_extrap_func(demo_sel_func)
self.neu_spec = demo_sel_extrap_func(tuple(self.params)+(0,), self.ns, self.pts)
self.spectra = np.array(self.spectra)
def _single_process(self, verbose, demo_sel_func):
demo_sel_extrap_func = Numerics.make_extrap_func(demo_sel_func)
for ii, gamma in enumerate(self.gammas):
self.spectra[ii] = demo_sel_extrap_func(tuple(self.params)+(gamma,), self.ns,
self.pts)
if verbose:
print('{0}: {1}'.format(ii, gamma))
def _multiple_processes(self, cpus, gpus, verbose, demo_sel_func):
from multiprocessing import Manager, Process, cpu_count
with Manager() as manager:
work = manager.Queue(cpus + gpus)
results = manager.list()
# Assemble pool of workers
pool = []
for ii in range(cpus):
p = Process(target=self._worker_sfs,
args=(work, results, demo_sel_func, self.params, self.ns, self.pts, verbose, False))
p.start()
pool.append(p)
for ii in range(gpus):
p = Process(target=self._worker_sfs,
args=(work, results, demo_sel_func, self.params, self.ns, self.pts, verbose, True))
p.start()
pool.append(p)
# Put all jobs on queue
for ii, gamma in enumerate(self.gammas):
work.put((ii, gamma))
# Put commands on queue to close out workers
for jj in range(cpus+gpus):
work.put(None)
# Stop workers
for p in pool:
p.join()
# Collect results
for ii, sfs in results:
self.spectra[ii] = sfs
def _worker_sfs(self, in_queue, outlist, popn_func, params, ns, pts, verbose, usegpu):
"""
Worker function -- used to generate SFSes for
single values of gamma.
"""
popn_func_ex = Numerics.make_extrap_func(popn_func)
dadi.cuda_enabled(usegpu)
while True:
item = in_queue.get()
if item is None:
return
ii, gamma = item
try:
sfs = popn_func_ex(tuple(params)+(gamma,), ns, pts)
if verbose:
print('{0}: {1}'.format(ii, gamma))
outlist.append((ii, sfs))
except BaseException as inst:
# If an exception occurs in the worker function, print an error
# and populate the outlist with an item that will cause a later crash.
tb = sys.exc_info()[2]
traceback.print_tb(tb)
outlist.append(inst)
def integrate(self, params, ns, sel_dist, theta, pts=None, exterior_int=True):
"""
Integrate spectra over a univariate prob. dist. for negative gammas.
params: Parameters for sel_dist
ns: Ignored
sel_dist: Univariate probability distribution,
taking in arguments (xx, params)
theta: Population-scaled mutation rate
pts: Ignored
exterior_int: If False, do not integrate outside sampled domain.
Note also that the ns and pts arguments are ignored. They are only
present for compatibility with other dadi functions that apply to
demographic models.
"""
# Restrict ourselves to negative gammas
Nneg = len(self.neg_gammas)
spectra = self.spectra[:Nneg]
# Weights for integration
weights = sel_dist(-self.neg_gammas, params)
weighted_spectra = 0*spectra
for ii, w in enumerate(weights):
weighted_spectra[ii] = w*spectra[ii]
fs = np.trapz(weighted_spectra, self.neg_gammas, axis=0)
if not exterior_int:
return Spectrum(theta*fs)
smallest_gamma = self.neg_gammas[-1]
largest_gamma = self.neg_gammas[0]
# Compute weight for the effectively neutral portion. Not using
# CDF function because want this to be able to compute weight
# for arbitrary mass functions
weight_neu, err_neu = scipy.integrate.quad(sel_dist, 0, -smallest_gamma,
args=params)
# compute weight for the effectively lethal portion
weight_del, err = scipy.integrate.quad(sel_dist, -largest_gamma, np.inf,
args=params)
fs += self.neu_spec*weight_neu
fs += spectra[0]*weight_del
return Spectrum(theta*fs)
def integrate_point_pos(self, params, ns, sel_dist, theta, demo_sel_func=None,
Npos=1, pts=None, exterior_int=True):
"""
Integrate spectra over a univariate prob. dist. for negative gammas,
plus one or more point masses of positive selection.
params: Parameters. The last Npos*2 are assumed to be the proportion of
positive selection and the gamma for each point mass, in the order
(ppos1, gammapos1, ppos2, gammapos2, ...).
The remaining parameters are for the continuous point mass.
ns: Ignored
sel_dist: Univariate probability distribution,
taking in arguments (xx, params)
theta: Population-scaled mutation rate
demo_sel_func: DaDi demographic function with selection.
gamma must be the last argument.
Npos: Number of positive point masses to model.
pts: Ignored, evaluation of demo_self_func will use pts from orignal
caching.
exterior_int: If False, do not integrate outside sampled domain.
Note also that the ns and pts arguments are ignored. They are only
present for compatibility with other dadi functions that apply to
demographic models.
"""
pdf_params, ppos, gammapos = params[:-2*Npos], params[-2], params[-1]
ppos_l, gammapos_l = params[-2*Npos::2], params[-2*Npos+1::2]
pdf_fs = self.integrate(pdf_params, None, sel_dist, theta, None,
exterior_int=exterior_int)
result = (1-np.sum(ppos_l))*pdf_fs
if demo_sel_func is not None:
demo_sel_func = Numerics.make_extrap_func(demo_sel_func)
for ppos, gammapos in zip(ppos_l, gammapos_l):
if gammapos not in self.gammas:
if demo_sel_func is None:
raise IndexError('Failed to find requested gammapos={0:.4f} '
'in Cache1D spectra. Was it included in '
'additional_gammas during cache generation?'.format(gammapos))
pos_fs = theta*demo_sel_func(tuple(self.params) + (gammapos,),
self.ns, self.pts)
self.gammas = np.append(self.gammas, gammapos)
self.spectra = np.append(self.spectra, [pos_fs.data], axis=0)
ii = list(self.gammas).index(gammapos)
pos_fs = Spectrum(self.spectra[ii])
result += ppos*pos_fs
return result
Classes
class Cache1D (params, ns, demo_sel_func, pts, gamma_bounds=(0.0001, 2000), gamma_pts=500, additional_gammas=[], cpus=None, gpus=0, verbose=False)
-
params: Optimized demographic parameters ns: Sample size(s) for cached spectra demo_sel_func: DaDi demographic function with selection. gamma must be the last argument. pts: Integration/extrapolation grid points settings for demo_sel_func gamma_bounds: Range of gammas to cache spectra for. gamma_pts: Number of gamma grid points over which to integrate additional_gammas: Additional positive values of gamma to cache for cpus: For multiprocessing, number of CPU jobs to launch. If None (default), then all CPUs available will be used. gpus: For multiprocessing, number of GPU jobs to launch. verbose: If True, print messages to track progress of cache generation.
Expand source code
class Cache1D: def __init__(self, params, ns, demo_sel_func, pts, gamma_bounds=(1e-4, 2000), gamma_pts=500, additional_gammas=[], cpus=None, gpus=0, verbose=False): """ params: Optimized demographic parameters ns: Sample size(s) for cached spectra demo_sel_func: DaDi demographic function with selection. gamma must be the last argument. pts: Integration/extrapolation grid points settings for demo_sel_func gamma_bounds: Range of gammas to cache spectra for. gamma_pts: Number of gamma grid points over which to integrate additional_gammas: Additional positive values of gamma to cache for cpus: For multiprocessing, number of CPU jobs to launch. If None (default), then all CPUs available will be used. gpus: For multiprocessing, number of GPU jobs to launch. verbose: If True, print messages to track progress of cache generation. """ self.params, self.ns, self.pts = tuple(params), tuple(ns), tuple(pts) #Create a vector of gammas that are log-spaced over an interval self.gammas = -np.logspace(np.log10(gamma_bounds[1]), np.log10(gamma_bounds[0]), gamma_pts) # Record negative gammas, for later use self.neg_gammas = self.gammas # Add additional gammas to cache self.gammas = np.concatenate((self.gammas, additional_gammas)) self.spectra = [None]*len(self.gammas) self.params = params self.ns = ns self.pts = pts self.func_name = demo_sel_func.__name__ if cpus is None: import multiprocessing cpus = multiprocessing.cpu_count() if not cpus > 1 or gpus > 0: #for running with a single thread self._single_process(verbose, demo_sel_func) else: #for running with with multiple cores self._multiple_processes(cpus, gpus, verbose, demo_sel_func) demo_sel_extrap_func = Numerics.make_extrap_func(demo_sel_func) self.neu_spec = demo_sel_extrap_func(tuple(self.params)+(0,), self.ns, self.pts) self.spectra = np.array(self.spectra) def _single_process(self, verbose, demo_sel_func): demo_sel_extrap_func = Numerics.make_extrap_func(demo_sel_func) for ii, gamma in enumerate(self.gammas): self.spectra[ii] = demo_sel_extrap_func(tuple(self.params)+(gamma,), self.ns, self.pts) if verbose: print('{0}: {1}'.format(ii, gamma)) def _multiple_processes(self, cpus, gpus, verbose, demo_sel_func): from multiprocessing import Manager, Process, cpu_count with Manager() as manager: work = manager.Queue(cpus + gpus) results = manager.list() # Assemble pool of workers pool = [] for ii in range(cpus): p = Process(target=self._worker_sfs, args=(work, results, demo_sel_func, self.params, self.ns, self.pts, verbose, False)) p.start() pool.append(p) for ii in range(gpus): p = Process(target=self._worker_sfs, args=(work, results, demo_sel_func, self.params, self.ns, self.pts, verbose, True)) p.start() pool.append(p) # Put all jobs on queue for ii, gamma in enumerate(self.gammas): work.put((ii, gamma)) # Put commands on queue to close out workers for jj in range(cpus+gpus): work.put(None) # Stop workers for p in pool: p.join() # Collect results for ii, sfs in results: self.spectra[ii] = sfs def _worker_sfs(self, in_queue, outlist, popn_func, params, ns, pts, verbose, usegpu): """ Worker function -- used to generate SFSes for single values of gamma. """ popn_func_ex = Numerics.make_extrap_func(popn_func) dadi.cuda_enabled(usegpu) while True: item = in_queue.get() if item is None: return ii, gamma = item try: sfs = popn_func_ex(tuple(params)+(gamma,), ns, pts) if verbose: print('{0}: {1}'.format(ii, gamma)) outlist.append((ii, sfs)) except BaseException as inst: # If an exception occurs in the worker function, print an error # and populate the outlist with an item that will cause a later crash. tb = sys.exc_info()[2] traceback.print_tb(tb) outlist.append(inst) def integrate(self, params, ns, sel_dist, theta, pts=None, exterior_int=True): """ Integrate spectra over a univariate prob. dist. for negative gammas. params: Parameters for sel_dist ns: Ignored sel_dist: Univariate probability distribution, taking in arguments (xx, params) theta: Population-scaled mutation rate pts: Ignored exterior_int: If False, do not integrate outside sampled domain. Note also that the ns and pts arguments are ignored. They are only present for compatibility with other dadi functions that apply to demographic models. """ # Restrict ourselves to negative gammas Nneg = len(self.neg_gammas) spectra = self.spectra[:Nneg] # Weights for integration weights = sel_dist(-self.neg_gammas, params) weighted_spectra = 0*spectra for ii, w in enumerate(weights): weighted_spectra[ii] = w*spectra[ii] fs = np.trapz(weighted_spectra, self.neg_gammas, axis=0) if not exterior_int: return Spectrum(theta*fs) smallest_gamma = self.neg_gammas[-1] largest_gamma = self.neg_gammas[0] # Compute weight for the effectively neutral portion. Not using # CDF function because want this to be able to compute weight # for arbitrary mass functions weight_neu, err_neu = scipy.integrate.quad(sel_dist, 0, -smallest_gamma, args=params) # compute weight for the effectively lethal portion weight_del, err = scipy.integrate.quad(sel_dist, -largest_gamma, np.inf, args=params) fs += self.neu_spec*weight_neu fs += spectra[0]*weight_del return Spectrum(theta*fs) def integrate_point_pos(self, params, ns, sel_dist, theta, demo_sel_func=None, Npos=1, pts=None, exterior_int=True): """ Integrate spectra over a univariate prob. dist. for negative gammas, plus one or more point masses of positive selection. params: Parameters. The last Npos*2 are assumed to be the proportion of positive selection and the gamma for each point mass, in the order (ppos1, gammapos1, ppos2, gammapos2, ...). The remaining parameters are for the continuous point mass. ns: Ignored sel_dist: Univariate probability distribution, taking in arguments (xx, params) theta: Population-scaled mutation rate demo_sel_func: DaDi demographic function with selection. gamma must be the last argument. Npos: Number of positive point masses to model. pts: Ignored, evaluation of demo_self_func will use pts from orignal caching. exterior_int: If False, do not integrate outside sampled domain. Note also that the ns and pts arguments are ignored. They are only present for compatibility with other dadi functions that apply to demographic models. """ pdf_params, ppos, gammapos = params[:-2*Npos], params[-2], params[-1] ppos_l, gammapos_l = params[-2*Npos::2], params[-2*Npos+1::2] pdf_fs = self.integrate(pdf_params, None, sel_dist, theta, None, exterior_int=exterior_int) result = (1-np.sum(ppos_l))*pdf_fs if demo_sel_func is not None: demo_sel_func = Numerics.make_extrap_func(demo_sel_func) for ppos, gammapos in zip(ppos_l, gammapos_l): if gammapos not in self.gammas: if demo_sel_func is None: raise IndexError('Failed to find requested gammapos={0:.4f} ' 'in Cache1D spectra. Was it included in ' 'additional_gammas during cache generation?'.format(gammapos)) pos_fs = theta*demo_sel_func(tuple(self.params) + (gammapos,), self.ns, self.pts) self.gammas = np.append(self.gammas, gammapos) self.spectra = np.append(self.spectra, [pos_fs.data], axis=0) ii = list(self.gammas).index(gammapos) pos_fs = Spectrum(self.spectra[ii]) result += ppos*pos_fs return result
Methods
def integrate(self, params, ns, sel_dist, theta, pts=None, exterior_int=True)
-
Integrate spectra over a univariate prob. dist. for negative gammas.
params: Parameters for sel_dist ns: Ignored sel_dist: Univariate probability distribution, taking in arguments (xx, params) theta: Population-scaled mutation rate pts: Ignored exterior_int: If False, do not integrate outside sampled domain.
Note also that the ns and pts arguments are ignored. They are only present for compatibility with other dadi functions that apply to demographic models.
Expand source code
def integrate(self, params, ns, sel_dist, theta, pts=None, exterior_int=True): """ Integrate spectra over a univariate prob. dist. for negative gammas. params: Parameters for sel_dist ns: Ignored sel_dist: Univariate probability distribution, taking in arguments (xx, params) theta: Population-scaled mutation rate pts: Ignored exterior_int: If False, do not integrate outside sampled domain. Note also that the ns and pts arguments are ignored. They are only present for compatibility with other dadi functions that apply to demographic models. """ # Restrict ourselves to negative gammas Nneg = len(self.neg_gammas) spectra = self.spectra[:Nneg] # Weights for integration weights = sel_dist(-self.neg_gammas, params) weighted_spectra = 0*spectra for ii, w in enumerate(weights): weighted_spectra[ii] = w*spectra[ii] fs = np.trapz(weighted_spectra, self.neg_gammas, axis=0) if not exterior_int: return Spectrum(theta*fs) smallest_gamma = self.neg_gammas[-1] largest_gamma = self.neg_gammas[0] # Compute weight for the effectively neutral portion. Not using # CDF function because want this to be able to compute weight # for arbitrary mass functions weight_neu, err_neu = scipy.integrate.quad(sel_dist, 0, -smallest_gamma, args=params) # compute weight for the effectively lethal portion weight_del, err = scipy.integrate.quad(sel_dist, -largest_gamma, np.inf, args=params) fs += self.neu_spec*weight_neu fs += spectra[0]*weight_del return Spectrum(theta*fs)
def integrate_point_pos(self, params, ns, sel_dist, theta, demo_sel_func=None, Npos=1, pts=None, exterior_int=True)
-
Integrate spectra over a univariate prob. dist. for negative gammas, plus one or more point masses of positive selection.
params: Parameters. The last Npos*2 are assumed to be the proportion of positive selection and the gamma for each point mass, in the order (ppos1, gammapos1, ppos2, gammapos2, …). The remaining parameters are for the continuous point mass. ns: Ignored sel_dist: Univariate probability distribution, taking in arguments (xx, params) theta: Population-scaled mutation rate demo_sel_func: DaDi demographic function with selection. gamma must be the last argument. Npos: Number of positive point masses to model. pts: Ignored, evaluation of demo_self_func will use pts from orignal caching. exterior_int: If False, do not integrate outside sampled domain.
Note also that the ns and pts arguments are ignored. They are only present for compatibility with other dadi functions that apply to demographic models.
Expand source code
def integrate_point_pos(self, params, ns, sel_dist, theta, demo_sel_func=None, Npos=1, pts=None, exterior_int=True): """ Integrate spectra over a univariate prob. dist. for negative gammas, plus one or more point masses of positive selection. params: Parameters. The last Npos*2 are assumed to be the proportion of positive selection and the gamma for each point mass, in the order (ppos1, gammapos1, ppos2, gammapos2, ...). The remaining parameters are for the continuous point mass. ns: Ignored sel_dist: Univariate probability distribution, taking in arguments (xx, params) theta: Population-scaled mutation rate demo_sel_func: DaDi demographic function with selection. gamma must be the last argument. Npos: Number of positive point masses to model. pts: Ignored, evaluation of demo_self_func will use pts from orignal caching. exterior_int: If False, do not integrate outside sampled domain. Note also that the ns and pts arguments are ignored. They are only present for compatibility with other dadi functions that apply to demographic models. """ pdf_params, ppos, gammapos = params[:-2*Npos], params[-2], params[-1] ppos_l, gammapos_l = params[-2*Npos::2], params[-2*Npos+1::2] pdf_fs = self.integrate(pdf_params, None, sel_dist, theta, None, exterior_int=exterior_int) result = (1-np.sum(ppos_l))*pdf_fs if demo_sel_func is not None: demo_sel_func = Numerics.make_extrap_func(demo_sel_func) for ppos, gammapos in zip(ppos_l, gammapos_l): if gammapos not in self.gammas: if demo_sel_func is None: raise IndexError('Failed to find requested gammapos={0:.4f} ' 'in Cache1D spectra. Was it included in ' 'additional_gammas during cache generation?'.format(gammapos)) pos_fs = theta*demo_sel_func(tuple(self.params) + (gammapos,), self.ns, self.pts) self.gammas = np.append(self.gammas, gammapos) self.spectra = np.append(self.spectra, [pos_fs.data], axis=0) ii = list(self.gammas).index(gammapos) pos_fs = Spectrum(self.spectra[ii]) result += ppos*pos_fs return result