Automatic Differentiation Design¶
Descirbe¶
用python实现实数的自动微分计算图。
使用数据结构构造计算图,支持加减乘除等二元运算符以及对数指数三角函数等一元运算。
支持定义后赋值方式
实现自动微分
计算图实现有eager 和 lazy 两种模式,前者每个Node的值都是立即求得的,而后者首先构建公式然后再整体求值,这么做可以方便进行剪枝等优化操作,但不方便调试。
lazy模式的执行方式为,首先对计算图进行拓扑排序,然后按照拓扑排序的顺序从前往后依次求值。代码中Seesion.run()方法演示了这个过程。其实着整个程序已经不仅仅是“自动微分”的演示了,而是tf图计算流程的演示,包括前向和后向(真实的算法模型中会使用更多种类的Op、模型本身也会更加复杂,但求解流程类似)
Code¶
from typing import Optional, List, Tuple, Dict, Union
import math
# eager, lazy
class Variable(object):
global_id = 0
def __init__(self, *inputs):
self.id = Variable.global_id
Variable.global_id += 1
self.inputs = inputs
self.grad = 0.0
# self.evaluate()
# print(f"eager exec: {self}")
def __repr__(self):
return f"{self.__class__.__name__}{self.id}(value={self.value}, grad={self.grad})"
def __add__(self, other):
return Add(self, other)
def __sub__(self, other):
return Sub(self, other)
def __mul__(self, other):
return Mul(self, other)
def __truediv__(self, other):
return Div(self, other)
def input2values(self):
return [i.value if isinstance(i, Variable) else i for i in self.inputs]
def evaluate(self):
self.value = self.compute(self.input2values())
def compute(self, inputs):
""" Forward """
return inputs[0]
def gradient(self, inputs, output_grad):
""" backward """
return [output_grad]
class Placeholder(Variable):
def __init__(self, ):
super().__init__(None)
class Constant(Variable):
def __init__(self, value):
super().__init__(value)
self.grad = None
class Operator(Variable):
def __init__(self, *inputs):
super().__init__(*inputs)
class Add(Operator):
def compute(self, inputs):
return inputs[0] + inputs[1]
def gradient(self, inputs, output_grad):
return [output_grad, output_grad] # gradient of a and b
class Sub(Operator):
def compute(self, inputs):
return inputs[0] - inputs[1]
def gradient(self, inputs, output_grad):
return [output_grad, - output_grad]
class Mul(Operator):
def compute(self, inputs):
return inputs[0] * inputs[1]
def gradient(self, inputs, output_grad):
return [inputs[1] * output_grad, inputs[0] * output_grad]
class Div(Operator):
def compute(self, inputs):
return inputs[0] / inputs[1]
def gradient(self, inputs, output_grad):
return [1.0 / inputs[1] * output_grad, (- inputs[0] / inputs[1]**2) * output_grad]
class Ln(Operator):
def compute(self, inputs):
return math.log(inputs[0])
def gradient(self, inputs, output_grad):
return [1.0/inputs[0] * output_grad]
class Sin(Operator):
def compute(self, inputs):
return math.sin(inputs[0])
def gradient(self, inputs, output_grad):
return [math.cos(inputs[0]) * output_grad]
class Seesion:
def __init__(self):
self.topo_list: List[Variable] = [] # 拓扑排序的顺序就是正向求值的顺序
self.root = None
def topological_sorting(self, root: Variable):
def dfs(topo_list: List[Variable], node):
if not isinstance(node, Variable):
return
for n in node.inputs:
dfs(topo_list, n)
topo_list.append(node) # 同一个节点可以添加多次,他们的梯度会累加
topo_list = []
dfs(topo_list, root)
self.topo_list: List[Variable] = topo_list # 拓扑排序的顺序就是正向求值的顺序
self.root = root
def run(self, root, feed_dict: Dict[str, Union[int, float]] = None):
"""
按照拓扑排序的顺序对计算图求值。注意:因为我们之前对node采用了eager模式,
实际上每个node值之前已经计算好了,但为了演示lazy计算的效果,这里使用拓扑
排序又计算了一遍。
"""
self.topological_sorting(root)
node_evaluated = set() # 保证每个node只被求值一次
print("Forward:")
for node in self.topo_list:
if node.inputs[0] is None:
node.inputs = [feed_dict[node]]
if node not in node_evaluated:
node.evaluate()
node_evaluated.add(node)
print(f"evaluate: {node}")
return self.root.value
def gradients(self):
reverse_topo = list(reversed(self.topo_list)) # 按照拓扑排序的反向开始微分
reverse_topo[0].grad = 1.0 # 输出节点梯度是1.0
print("Backward:")
for node in reverse_topo:
grad = node.gradient(node.input2values(), node.grad)
# 将梯度累加到每一个输入变量的梯度上
for i, g in zip(node.inputs, grad):
if isinstance(i, Variable) and not isinstance(i, Constant):
i.grad += g
# print("AFTER AUTODIFF:")
# for node in reverse_topo:
# print(node)
if __name__ == "__main__":
x2 = Variable(5.0)
x1 = Placeholder()
# x1 = Variable(2.0)
# y = ln(x1) + x1*x2 - sin(x2)
f = Sub(Add(Ln(x1), Mul(x1, x2)), Sin(x2))
sess = Seesion()
out = sess.run(f, feed_dict={x1: 2.0})
sess.gradients() # 反向计算 自动微分
print(f"f = {out:.3f}")
print(f"x1.grad = {x1.grad:.3f}")
print(f"x2.grad = {x2.grad:.3f}")
# f = ln(x+2)
# sess = Seesion()
# x1 = Variable(2.0)
# c = Constant(2.0)
# f = Ln(Add(x1, c))
# out = sess.run(f)
# sess.gradients()
# print(x1.grad)
# sess = Seesion()
# x1 = Variable(2.0)
# c = Constant(2.0)
# f = Ln(x1 + c)
# out = sess.run(f)
# sess.gradients()
# print(x1.grad)
sess = Seesion()
x1 = Variable(2.0)
x2 = Variable(2.0)
f = x1 / x2
out = sess.run(f)
sess.gradients()
print(f"f = {out:.3f}")
print(f"x1.grad = {x1.grad:.3f}")
print(f"x2.grad = {x2.grad:.3f}")