Mejorando el rendimiento en Python

Daniel Molina published on
10 min, 1900 words

Tags: python

Todo el código fuente de este post está disponible en github.

Tengo predilección por Python. Para mí, es un gran lenguaje para crear prototipos en muchas áreas. Para mi trabajo de investigación, suelo crear/diseñar algoritmos de optimización continua utilizando algoritmos evolutivos. Para estos algoritmos, lenguajes como C/C++ o Java son muy utilizados, especialmente por su buen rendimiento (para publicar, es habitual tener que hacer muchas comparaciones entre algoritmos, por lo que el rendimiento puede ser crítico. Sin embargo, para probar nuevas ideas, muchos autores utilizan otras herramientas como Mathlab que reduce el tiempo de desarrollo a costa de un mayor tiempo de computación.

Estoy de acuerdo en que Mathlab es genial para los algoritmos numéricos, pero sigo prefiriendo Python sobre Mathlab, porque me siento más cómodo con él, y tengo muchas bibliotecas, y es más sencillo llamar a código en otros lenguajes, escrito en C o Java. Eso nos permite aumentar el rendimiento, y me gusta probar cuánto se puede mejorar.

Hace varios meses, empecé a escribir mi algoritmo más exitoso, Algoritmos Meméticos basados en el Encadenamiento LS, en Python. Tenía varias dudas sobre el rendimiento, así que empiezo a escribir un elemento, un Algoritmo Genético de Estado Estable, en Python.

Llamando a código C/C++ desde python

El primer reto que tuve que afrontar fue permitir que mi programa en python utilizara las mismas funciones de referencia que otras implementaciones, CEC'2005 benchmark. Este benchmark define las funciones a optimizar, por lo que su función principal es evaluar mis soluciones, cuando cada solución es un vector de números reales, con un valor real de fitness. El código del benchmark fue implementado (por sus autores) en C/C++. Por lo tanto, mi código python tiene que llamar al código C++.

Para ello, he utilizado la librería boost::python, que es, en mi opinión, la forma más sencilla de llamar al código C/C++, especialmente cuando utilizamos el paquete numpy.

En mi caso, es muy sencillo, porque necesito pocas funciones:

#include <boost/python.hpp>
#include <boost/python/numeric.hpp>
#include <boost/python/list.hpp>
#include <iostream>
#include "cec2005/cec2005.h"
#include "cec2005/srandom.h"

using namespace boost::python;

Random r(new SRandom(12345679));

void set_function(int fun, int dim) {
    init_cec2005(&r, fun, dim);
}

double evalua(const numeric::array &el) {
   const tuple &shape = extract<tuple>(el.attr("shape"));
   unsigned n = boost::python::extract<unsigned>(shape[0]);
   double *tmp = new double[n];
  for(unsigned int i = 0; i < n; i++)
    {
      tmp[i] = boost::python::extract<double>(el[i]);
    }
  double result = eval_cec2005(tmp, n);
  delete tmp;
  return result;
}
...

BOOST_PYTHON_MODULE(libpycec2005)
{
    using namespace boost::python;
    numeric::array::set_module_and_type( "numpy", "ndarray");
    def("config", &set_function);
    def("evaluate", &evalua);
    ...
}

Más información en la buena documentación de boost::python.

Uno puede llamar al código C/C++, hemos implementado el algoritmo en código python. El código de prueba fue el siguiente:

from ssga import SSGA
from readargs import ArgsCEC05
import libpycec2005 as cec2005
import numpy

def check_dimension(option, opt, value):
    if value not in [2, 10, 30, 50]:
        raise OptionValueError(
            "option %s: invalid dimensionality value: %r" % (opt, value))

def main():
    """
    Main program
    """
    args = ArgsCEC05()

    if  args.hasError:
        args.print_help_exit()

    fun = args.function
    dim = args.dimension

    print "Function: %d" %fun
    print "Dimension: %d" %dim
    cec2005.config(fun, dim)
    domain = cec2005.domain(fun)
    print "Domain: ", domain
    ea = SSGA(domain=domain, size=60, dim=dim, fitness=cec2005.evaluate)

    for x in xrange(25):
        ea.run(maxeval=dim*10000)
        [bestsol, bestfit] = ea.getBest()
        print "BestSol: ", bestsol
        print "BestFitness: %e" %bestfit
        ea.reset()

if __name__ == "__main__":
    main()

Este código fuente ejecuta el algoritmo 25 veces, y en cada ejecución el algoritmo se detiene cuando se crean 10000*dim soluciones. Estas condiciones están indicadas en la especificación del benchmark. El único parámetro fue la función (-f, se utilizó la función 1 por por defecto) y la dimensión (-d) de 10, 30, 50.

Perfilando el tiempo de computación

¿Cuánto tiempo tarda? He cambiado xrange(25) por xrange(1) y hemos ejecutado su versión actual. El tiempo final fue de 7 minutos para la dimensión 10, y 21 minutos para la dimensión 30 (para una sola función).

Como me gusta hacer cosas más interesantes, que sólo esperar el tiempo de computación, yo uso el perfil, sólo una ejecución para la función, para detectar las funciones/método más costosos en tiempo de computación.

python -m cProfile runcec.py -f 1 -d 10

The output was the following:

        2943600 function calls (2943531 primitive calls) in 31.031 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
....
      1    0.001    0.001    0.126    0.126 ssga.py:1(<module>)
    99940    0.561    0.000   17.463    0.000 ssga.py:109(cross)
        1    0.000    0.000    0.000    0.000 ssga.py:123(reset)
        1    5.559    5.559   51.129   51.129 ssga.py:126(run)
        1    0.000    0.000    0.000    0.000 ssga.py:14(__init__)
        1    0.000    0.000    0.000    0.000 ssga.py:158(getBest)
        1    0.000    0.000    0.000    0.000 ssga.py:31(set_mutation_rate)
    99940    0.730    0.000    1.885    0.000 ssga.py:45(mutation)
    12438    0.286    0.000    0.758    0.000 ssga.py:50(mutationBGA)
        1    0.002    0.002    0.002    0.002 ssga.py:77(initPopulation)
   105883    1.101    0.000    5.604    0.000 ssga.py:89(updateWorst)
        1    0.000    0.000    0.000    0.000 ssga.py:9(SSGA)
    99940    1.049    0.000   20.617    0.000 ssga.py:97(getParents)
...

Con el perfil podemos observar los métodos más costosos de nuestro código: getParents (20 segundos), operador de cruce (17 segundos) y updateWorst (5 segundos). Estos métodos suponen el 85% del tiempo de computación, y los dos primeros el 74% del tiempo de cálculo.

Optimización del código

Eso demuestra que la mayor parte del tiempo de computación se debe a una minoría del código, sólo tres métodos. Si podemos optimizar estos métodos, nuestro código podría mejorado mucho.

Podemos utilizar de nuevo el paquete boost::python, pero es un poco tedioso utilizarlo. Por lo tanto, hemos utilizado el paquete cython. Con cython podemos optimizar el código fuente añadiendo información sobre los tipos.

Por ejemplo, en lugar del siguiente código

import numpy as np

def distance(ind1,ind2):
    """
    Euclidean distance
    ind1 -- first array to compare
    ind2 -- second array to compare

    Return euclidean distance between the individuals

    >>> from numpy.random import rand
    >>> import numpy as np
    >>> dim = 30
    >>> sol = rand(dim)
    >>> distance(sol,sol)
    0.0
    >>> ref=np.zeros(dim)
    >>> dist=distance(sol,ref)
    >>> dist > 0
    True
    >>> dist2 = distance(sol*2,ref)
    >>> 2*dist == dist2
    True
    """
    dif = ind1-ind2
    sum = (dif*dif).sum()
    return math.sqrt(sum)

Podemos escribir:

cimport numpy as np
cimport cython
DTYPE = np.double
ctypedef np.double_t DTYPE_t
ctypedef np.int_t BTYPE_t

def distance(np.ndarray[DTYPE_t, ndim=1]ind1, np.ndarray[DTYPE_t, ndim=1] ind2):
    """
    Euclidean distance
    ind1 -- first array to compare
    ind2 -- second array to compare

    ....
    """
    cdef np.ndarray[DTYPE_t, ndim=1] dif = ind1-ind2
    cdef double sum = (dif*dif).sum()
    return math.sqrt(sum)

Podemos ver que sigue siendo muy legible. sólo hemos puesto información sobre el tipo y dimensión en los parámetros del vector y sobre las variables, usando la palabra clave cdef.

Veamos como ejemplo el primer método, el operador de cruce, implementado en el método crossBLX:

import numpy as np
import math

def crossBLX(mother,parent,domain,alpha):
    """
    crossover operator BLX-alpha

    mother -- mother (first individual)
    parent -- parent (second individual)
    domain -- domain to check
    alpha  -- parameter alpha

    Returns the new children following the expression children = random(x-alpha*dif, y+alpha*dif),
                where dif=abs(x,y) and x=lower(mother,parents), y=upper(mother,parents)

    >>> import numpy as np
    >>> low=-5
    >>> upper = 5
    >>> dim=30
    >>> sol = np.array([1,2,3,2,1])
    >>> crossBLX(sol,sol,[low,upper],0)
    array([ 1.,  2.,  3.,  2.,  1.])
    """
    diff = abs(mother-parent)
    dim = mother.size
    I=diff*alpha
    points = np.array([mother,parent])
    A=np.amin(points,axis=0)-I
    B=np.amax(points,axis=0)+I
    children = np.random.uniform(A,B,dim)
    [low,high]=domain
    return np.clip(children, low, high)

Podemos ver que es muy sencillo de implementar usando numpy, pero sigue siendo muy lento. Con cython he definido implementar directamente las numerosas operaciones, el siguiente código:

def crossBLX(np.ndarray[DTYPE_t, ndim=1] mother,np.ndarray[DTYPE_t, ndim=1] parent,list domain, double alpha):
    """
    ...
    """
    cdef np.ndarray[DTYPE_t, ndim=1] C, r
    cdef int low, high, dim
    cdef double x, y
    cdef double I, A, B
    cdef unsigned i
    [low,high]=domain
    dim = mother.shape[0]
    C = np.zeros(dim)
    r = random.randreal(0,1,dim)

    for i in range(dim):
        if mother[i] < parent[i]:
           (x,y) = (mother[i],parent[i])
        else:
           (y,x) = (mother[i],parent[i])

        I = alpha*(y-x)
        A=x-I
        B=y+I

        if (A < low):
            A = low
        if (B > high):
            B = high

        C[i] = A+r[i]*(B-A)

    return C

Es cierto que el código fuente es más complicado, pero sigue siendo muy legible. He comparado las dos implementaciones para asegurarme de que ambas devuelven los mismos valores.

Midiendo la mejora

¿Cuánto cuestan estos pequeños cambios en el código? He vuelto a perfilar el código fuente y me da:

         1020045 function calls (1019976 primitive calls) in 18.003 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
....
        1    0.001    0.001    0.127    0.127 ssga.py:1(<module>)
    99940    0.425    0.000    2.432    0.000 ssga.py:109(cross)
        1    0.000    0.000    0.000    0.000 ssga.py:123(reset)
        1    5.415    5.415   17.864   17.864 ssga.py:126(run)
        1    0.000    0.000    0.000    0.000 ssga.py:14(__init__)
        1    0.000    0.000    0.000    0.000 ssga.py:158(getBest)
        1    0.000    0.000    0.000    0.000 ssga.py:31(set_mutation_rate)
    99940    0.699    0.000    2.006    0.000 ssga.py:45(mutation)
    12544    0.338    0.000    0.929    0.000 ssga.py:50(mutationBGA)
        1    0.002    0.002    0.002    0.002 ssga.py:77(initPopulation)
   105959    0.775    0.000    1.343    0.000 ssga.py:89(updateWorst)
        1    0.000    0.000    0.000    0.000 ssga.py:9(SSGA)
    99940    0.940    0.000    6.665    0.000 ssga.py:97(getParents)
....

Podemos ver la mejora obtenida.

MethodPythonCython
cross :17.42.4
getParents :20.66.6
updateWorst :5.61.3
Total43.610.3

El nuevo código tarda sólo un 23% del tiempo de cálculo del código anterior. Con estos cambios, hemos reducido el tiempo total de 51 segundos a 18 de código.

En perspectiva

Ahora, ejecuto el código fuente sin el perfil, y pruebo el código fuente obteniendo el siguiente tiempo:

Methoddim=10dim=30dim=50
Python44s3240s (54m)--
Cython10s28s48s
Improvement77%99%---

En la siguiente tabla, comprobamos el tiempo máximo para una y 25 ejecuciones (el tiempo depende de la función utilizada).

#functionsdim=10dim=30dim=50
110s/18s28s/40s48s/1m
253/7m15/21m38m/

Así, el tiempo total de cálculo es de 7 minutos para la dimensión 10, y de 21 minutos para la dimensión 30. Estas cifras son muy aceptables, especialmente porque podemos probar en paralelo las diferentes funciones en un cluster de ordenadores. Desgraciadamente, una implementación en Mathlab no sólo llevaría más tiempo, sino que sino que además, por razones de licencia, no podría ejecutarse en paralelo sin límite.

En resumen, podemos utilizar código python, no sólo para crear prototipos experimentales, sino también para crear prototipos funcionales.

Y sobre el posible problema de las pruebas, he estado trabajando en ello, pero creo que es suficiente para un post, ¿no? :-)

Todo el código referido en el post, tanto en python como en cython, está disponible en github, por si quieres verlo.