Automatic Differentiation Design

Descirbe

用python实现实数的自动微分计算图。

  • 使用数据结构构造计算图,支持加减乘除等二元运算符以及对数指数三角函数等一元运算。

  • 支持定义后赋值方式

  • 实现自动微分

计算图实现有eagerlazy 两种模式,前者每个Node的值都是立即求得的,而后者首先构建公式然后再整体求值,这么做可以方便进行剪枝等优化操作,但不方便调试。

lazy模式的执行方式为,首先对计算图进行拓扑排序,然后按照拓扑排序的顺序从前往后依次求值。代码中Seesion.run()方法演示了这个过程。其实着整个程序已经不仅仅是“自动微分”的演示了,而是tf图计算流程的演示,包括前向和后向(真实的算法模型中会使用更多种类的Op、模型本身也会更加复杂,但求解流程类似)

Code

from typing import Optional, List, Tuple, Dict, Union
import math

# eager, lazy

class Variable(object):
    global_id = 0

    def __init__(self, *inputs):
        self.id = Variable.global_id
        Variable.global_id += 1

        self.inputs = inputs
        self.grad = 0.0
        # self.evaluate()
        # print(f"eager exec: {self}")

    def __repr__(self):
        return f"{self.__class__.__name__}{self.id}(value={self.value}, grad={self.grad})"

    def __add__(self, other):
        return Add(self, other)

    def __sub__(self, other):
        return Sub(self, other)
    
    def __mul__(self, other):
        return Mul(self, other)

    def __truediv__(self, other):
        return Div(self, other)

    def input2values(self):
        return [i.value if isinstance(i, Variable) else i for i in self.inputs]

    def evaluate(self):
        self.value = self.compute(self.input2values())

    def compute(self, inputs):
        """ Forward """
        return inputs[0]

    def gradient(self, inputs, output_grad):
        """ backward """
        return [output_grad]

class Placeholder(Variable):
    def __init__(self, ):
        super().__init__(None)

class Constant(Variable):
    def __init__(self, value):
        super().__init__(value)
        self.grad = None

class Operator(Variable):
    def __init__(self, *inputs):
        super().__init__(*inputs)

class Add(Operator):
    def compute(self, inputs):
        return inputs[0] + inputs[1]

    def gradient(self, inputs, output_grad):
        return [output_grad, output_grad] # gradient of a and b

class Sub(Operator):
    def compute(self, inputs):
        return inputs[0] - inputs[1]
    
    def gradient(self, inputs, output_grad):
        return [output_grad, - output_grad]

class Mul(Operator):
    def compute(self, inputs):
        return inputs[0] * inputs[1]

    def gradient(self, inputs, output_grad):
        return [inputs[1] * output_grad, inputs[0] * output_grad]

class Div(Operator):
    def compute(self, inputs):
        return inputs[0] / inputs[1]

    def gradient(self, inputs, output_grad):
        return [1.0 / inputs[1] * output_grad, (- inputs[0] / inputs[1]**2) * output_grad]

class Ln(Operator):
    def compute(self, inputs):
        return math.log(inputs[0])

    def gradient(self, inputs, output_grad):
        return [1.0/inputs[0] * output_grad]

class Sin(Operator):
    def compute(self, inputs):
        return math.sin(inputs[0])

    def gradient(self, inputs, output_grad):
        return [math.cos(inputs[0]) * output_grad]

class Seesion:
    def __init__(self):
        self.topo_list: List[Variable] = [] # 拓扑排序的顺序就是正向求值的顺序
        self.root = None

    def topological_sorting(self, root: Variable):

        def dfs(topo_list: List[Variable], node):
            if not isinstance(node, Variable):
                return
            for n in node.inputs:
                dfs(topo_list, n)
            topo_list.append(node) # 同一个节点可以添加多次,他们的梯度会累加

        topo_list = []
        dfs(topo_list, root)

        self.topo_list: List[Variable] = topo_list # 拓扑排序的顺序就是正向求值的顺序
        self.root = root

    def run(self, root, feed_dict: Dict[str, Union[int, float]] = None):
        """
        按照拓扑排序的顺序对计算图求值。注意:因为我们之前对node采用了eager模式,
        实际上每个node值之前已经计算好了,但为了演示lazy计算的效果,这里使用拓扑
        排序又计算了一遍。
        """
        self.topological_sorting(root)
        node_evaluated = set() # 保证每个node只被求值一次
        print("Forward:")
        for node in self.topo_list:
            if node.inputs[0] is None:
                node.inputs = [feed_dict[node]]

            if node not in node_evaluated:
                node.evaluate()
                node_evaluated.add(node)
                print(f"evaluate: {node}")

        return self.root.value

    def gradients(self):
        reverse_topo = list(reversed(self.topo_list)) # 按照拓扑排序的反向开始微分
        reverse_topo[0].grad = 1.0 # 输出节点梯度是1.0

        print("Backward:")
        for node in reverse_topo:
            grad = node.gradient(node.input2values(), node.grad)
            # 将梯度累加到每一个输入变量的梯度上
            for i, g in zip(node.inputs, grad):
                if isinstance(i, Variable) and not isinstance(i, Constant):
                    i.grad += g
        # print("AFTER AUTODIFF:")
        # for node in reverse_topo:
        #     print(node)

if __name__ == "__main__":
    x2 = Variable(5.0)
    x1 = Placeholder()
    # x1 = Variable(2.0)
     # y = ln(x1) + x1*x2 - sin(x2)
    f = Sub(Add(Ln(x1), Mul(x1, x2)), Sin(x2))
    sess = Seesion()
    out = sess.run(f, feed_dict={x1: 2.0})
    sess.gradients() # 反向计算 自动微分
    print(f"f = {out:.3f}")
    print(f"x1.grad = {x1.grad:.3f}")
    print(f"x2.grad = {x2.grad:.3f}")

    # f = ln(x+2)
    # sess = Seesion()
    # x1 = Variable(2.0)
    # c = Constant(2.0)
    # f = Ln(Add(x1, c))
    # out = sess.run(f)
    # sess.gradients()
    # print(x1.grad)

    # sess = Seesion()
    # x1 = Variable(2.0)
    # c = Constant(2.0)
    # f = Ln(x1 + c)
    # out = sess.run(f)
    # sess.gradients()
    # print(x1.grad)

    sess = Seesion()
    x1 = Variable(2.0)
    x2 = Variable(2.0)

    f = x1 / x2
    out = sess.run(f)
    sess.gradients()
    print(f"f = {out:.3f}")
    print(f"x1.grad = {x1.grad:.3f}")
    print(f"x2.grad = {x2.grad:.3f}")

Reference