High level api
custom model
定义模型需要继承自tff.learning.Model
类,同时根据联邦学习的流程需要定义好,模型训练和辅助训练变量。变量必须要使用tf
的类型,因为在实际环境中,联邦学习是要部署到移动端的,调用的不一定是python。
MnistVariables = collections.namedtuple('MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')
# total variable
def create_mnist_variables():
return MnistVariables(
weights=tf.Variable(
lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
name='weights',
trainable=True),
bias=tf.Variable(
lambda: tf.zeros(dtype=tf.float32, shape=(10)),
name='bias',
trainable=True),
num_examples=tf.Variable(0.0, name='num_examples', trainable=False),
loss_sum=tf.Variable(0.0, name='loss_sum', trainable=False),
accuracy_sum=tf.Variable(0.0, name='accuracy_sum', trainable=False))
有了模型的变量之后定义模型的前向传播过程,注意在前向传播过程中variable
的loss
等参数都进行了修改,同时定义Server
从Client
得到的数据。
def mnist_forward_pass(variables, batch):
y = tf.nn.softmax(tf.matmul(batch['x'], variables.weights) + variables.bias)
predictions = tf.cast(tf.argmax(y, 1), tf.int32)
flat_labels = tf.reshape(batch['y'], [-1])
loss = -tf.reduce_mean(
tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
accuracy = tf.reduce_mean(
tf.cast(tf.equal(predictions, flat_labels), tf.float32))
num_examples = tf.cast(tf.size(batch['y']), tf.float32)
variables.num_examples.assign_add(num_examples)
variables.loss_sum.assign_add(loss * num_examples)
variables.accuracy_sum.assign_add(accuracy * num_examples)
return loss, predictions
def get_local_mnist_metrics(variables):
return collections.OrderedDict(
num_examples=variables.num_examples,
loss=variables.loss_sum / variables.num_examples,
accuracy=variables.accuracy_sum / variables.num_examples)
在从client
得到数据后,server
要做的就是对数据进行整合。这里metrics
参数对应的是get_local_mnist_metrics
的所有结果。tff
是面向所有client
的,我理解的下面的操作都是从一个list dict中做加权平均,这里的metrics
参数没有体现list。
@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
return collections.OrderedDict(
num_examples=tff.federated_sum(metrics.num_examples),
loss=tff.federated_mean(metrics.loss, metrics.num_examples),
accuracy=tff.federated_mean(metrics.accuracy, metrics.num_examples))
有了上面的模型参数
、前向传播
、返回结果
和聚合结果
后,定义模型。这个定义模型,我理解的是对一个client
的模型,上述的模型参数
、前向传播
、返回结果
和聚合结果
都是针对client
而言的,猜测tff
从一个client
到多个clients
实现了一些包装,是这个过程没有体现在代码里。这里model
要实现定义模型参数,可训练参数,不可训练参数,前向传播,本地变量和指定输入数据类型,汇报结果和结果整合。其中tff.learning.BatchOutput
是tff
中封装输出结果的结构。
class MnistModel(tff.learning.Model):
def __init__(self):
self._variables = create_mnist_variables()
@property
def trainable_variables(self):
return [self._variables.weights, self._variables.bias]
@property
def non_trainable_variables(self):
return []
@property
def local_variables(self):
return [
self._variables.num_examples, self._variables.loss_sum,
self._variables.accuracy_sum
]
@property
def input_spec(self):
return collections.OrderedDict(
x=tf.TensorSpec([None, 784], tf.float32),
y=tf.TensorSpec([None, 1], tf.int32))
@tf.function
def forward_pass(self, batch, training=True):
del training
loss, predictions = mnist_forward_pass(self._variables, batch)
num_exmaples = tf.shape(batch['x'])[0]
return tff.learning.BatchOutput(
loss=loss, predictions=predictions, num_examples=num_exmaples)
@tf.function
def report_local_outputs(self):
return get_local_mnist_metrics(self._variables)
@property
def federated_output_computation(self):
return aggregate_mnist_metrics_across_clients
建立好模型之后进行模型训练:
iterative_process = tff.learning.build_federated_averaging_process(
MnistModel,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02))
state = iterative_process.initialize()
state, metrics = iterative_process.next(state, federated_train_data)
print('round 1, metrics={}'.format(metrics))
FC core
这部分是如何从底层创建联邦学习模型,首先要准备要了解一些概念。
数据类型
要显示的指出数据存储在C/S端,是全局唯一的还是多份拷贝的。注意的是使用print
函数输出时,会将数据类型和值一起输出,成为compat notation
。
-
与端无关的数据
- 张量类型,
tff.TensorType
。需要指定它的元素数据类型dtype
和形状shape
- 列表类型,
tff.SequenceType
。其中的元素类型应当为TFF的tff.Type
或者是能转换成tff.Type
的东西。print
打印列表类型数据时,会出现*
表示列表。 - 元组类型,
tff.NamedTupleType
。tff.NamedTupleType
接受三种类型的输入:list
,tuple
和collections.OrderedDict
。print
打印元组类型以<>
作为标记。 - 函数类型,
tff.FunctionType
。tff.FunctionType
需要指定函数的输入类型,且只能有一个输入值,和一个函数返回值。
- 张量类型,
-
与端有关的数据类型
端有关的类型,主要完成两件任务:
- 显式地定义数据值应该存放在C端还是S端(
Placement
)tff.SERVER
ortff.CLIENTS
- 定义这个数据是否全局一致(
All equal
)
- 显式地定义数据值应该存放在C端还是S端(
-
联邦数据类型
tff.FederatedType
把上面提到的端无关类型包装起来,并增加placement
和all_equal
两个属性。其中all_equal
可选,如果placement=tff.SERVER
,则默认为True
。使用print
函数打印变量时,花括号{}
表示非全局唯一,而没有花括号就表示全局唯一。 -
变量声明
定义变量类型后,声明变量使用
tff.utils.create_variables(name, type)
,如OUR_TYPE = tff.TensorType(tf.int8, shape=[10]) var = tff.utils.create_variables('var_name', OUR_TYPE) print(OUR_TYPE) print(var)
函数定义
-
与端无关的函数
函数需要使用
tff.tf_computation(type)
来wrap up
函数,其中type
表示函数传入形参x
的类型。@tff.tf_computation(tff.SequenceType(tf.int32)) def add_up_integeres(x): return x.reduce(np.int32(0), lambda x, y: x+y)
-
与端有关的函数
与端有关的函数不仅需要指定类型,还需要指定
placement
。装饰器也变为tff.federated_computation
@tff.federated_computation(tff.FederatedType(tf.float32, tff.Clients)) def get_average_temperature(sensor_readings): return tff.federated_mean(sensor_readings) print(get_average_temperature.type_signature)
逻辑回归实例
下面以逻辑回归为例,整理数据准备到模型训练的过程。跟上面的high-level api
的明显区别是,从底层构建联邦学习要明确定义好,函数的输入输出类型。
-
准备数据
数据存放是长度为10的list->每个数字user个batch这样的格式。例如
federated_train_data[5]
表示就是数字都为5的batch list。import collections import numpy as np import tensorflow as tf import tensorflow_federated as tff tf.compat.v1.enable_v2_behavior() tff.framework.set_default_executor(tff.framework.ReferenceExecutor()) mnist_train, mnist_test = tf.keras.datasets.mnist.load_data() NUM_EXAMPLES_PER_USER = 1000 BATCH_SIZE = 50 def get_data_for_digit(source, digit): output_sequence = [] all_samples = [i for i, d in enumerate(source[1]) if d == digit] for i in range(0, min(len(all_samples), NUM_EXAMPLES_PER_USER), BATCH_SIZE): batch_samples = all_samples[i:i + BATCH_SIZE] output_sequence.append({ 'x': np.array([source[0][i].flatten() / 255.0 for i in batch_samples], dtype=np.float32), 'y': np.array([source[1][i] for i in batch_samples], dtype=np.int32) }) return output_sequence federated_train_data = [get_data_for_digit(mnist_train, d) for d in range(10)] federated_test_data = [get_data_for_digit(mnist_test, d) for d in range(10)]
在整理好训练数据后,定义每个batch的数据类型
BATCH_SPEC = collections.OrderedDict( x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32), y=tf.TensorSpec(shape=[None], dtype=tf.int32)) BATCH_TYPE = tff.to_type(BATCH_SPEC)
-
batch的前向传播计算
在计算前向传播的时候需要模型的参数,先定义好参数数据类型
MODEL_SPEC = collection.OrderedDict( weights = tf.TensorSpec(shape=[784, 10], dtype=tf.float32), bias = tf.TensorSpec(shape=[10], dtype=tf.float32)) MODEL_TYPEP = tff.to_type(MODEL_SPEC)
在封装前向传播的时候,使用了tf定义的前向传播函数。
@tf.function def forward_pass(model, batch): predicted_y = tf.nn.softmax(tf.matmul(batch['x'], model['weights'])+model['bias']) return -tf.reduce_mean(tf.reduce_sum(tf.onehot(batch['y'], 10)*tf.math.log(predicted_y), axis=[1])) @tff.tf.computation(MODEL_TYPE, BATCH_TYPE) def batch_loss(model, batch): return forward_pass(model, batch) ''' # check initial_model = collections.OrderedDict( weights=np.zeros([784, 10], dtype=np.float32), bias=np.zeros([10], dtype=np.float32)) sample_batch = federated_train_data[5][-1] print(batch_loss(initial_model, sample_batch)) '''
-
batch optimization
batch优化的时候,仍然使用
tf
的优化器,返回的是一个参数修改后的model
# optimizer @tff.tf_computation(MODEL_TYPE, BATCH_TYPE, tf.float32) def batch_train(initial_model, batch, learning_rate): # Define a group of model variables and set them to `initial_model`. Must # be defined outside the @tf.function. model_vars = collections.OrderedDict([ (name, tf.Variable(name=name, initial_value=value)) for name, value in initial_model.items() ]) optimizer = tf.keras.optimizers.SGD(learning_rate) @tf.function def _train_on_batch(model_vars, batch): # Perform one step of gradient descent using loss from `batch_loss`. with tf.GradientTape() as tape: loss = forward_pass(model_vars, batch) grads = tape.gradient(loss, model_vars) optimizer.apply_gradients( zip(tf.nest.flatten(grads), tf.nest.flatten(model_vars))) return model_vars return _train_on_batch(model_vars, batch) print(str(batch_train.type_signature)) ''' # check model = initial_model losses = [] for _ in range(5): model = batch_train(model, sample_batch, 0.1) losses.append(batch_loss(model, sample_batch)) print("5 loops loss:", losses) '''
-
sequence batch optimization
上面的
batch_train
是针对一个batch而言的,tff
是要面对多个clients
的,调用tff
的api构建sequence batch训练函数。LOCAL_DATA_TYPE = tff.SequenceType(BATCH_TYPE) @tff.federated_computation(MODEL_TYPE, tf.float32, LOCAL_DATA_TYPE) def local_train(initial_model, learning_rate, all_batches): # Mapping function to apply to each batch. @tff.federated_computation(MODEL_TYPE, BATCH_TYPE) def batch_fn(model, batch): return batch_train(model, batch, learning_rate) return tff.sequence_reduce(all_batches, initial_model, batch_fn) print(str(local_train.type_signature)) ''' # check locally_trained_model = local_train(initial_model, 0.1, federated_train_data[5]) '''
tff.sequence_reduce
的参数是value, zero, op
,其中op
是一个输入<U, T> -> U
的函数。 -
sequence_metric
同上面一样,对sequence进行评估
# metric @tff.federated_computation(MODEL_TYPE, LOCAL_DATA_TYPE) def local_eval(model, all_batches): return tff.sequence_sum( tff.sequence_map( tff.federated_computation(lambda b: batch_loss(model, b), BATCH_TYPE), all_batches))
-
federated train
到目前为止,我们只是对某一种数字图片
sequence batch
进行了训练,下面进行对所有类型图片进行评估和学习。SERVER_MODEL_TYPE = tff.FederatedType(MODEL_TYPE, tff.SERVER) CLIENT_DATA_TYPE = tff.FederatedType(LOCAL_DATA_TYPE, tff.CLIENTS) @tff.federated_computation(SERVER_MODEL_TYPE, CLIENT_DATA_TYPE) def federated_eval(model, data): return tff.federated_mean( tff.federated_map(local_eval, [tff.federated_broadcast(model), data])) ''' print('initial_model loss =', federated_eval(initial_model, federated_train_data)) print('locally_trained_model loss =', federated_eval(locally_trained_model, federated_train_data)) ''' SERVER_FLOAT_TYPE = tff.FederatedType(tf.float32, tff.SERVER) @tff.federated_computation(SERVER_MODEL_TYPE, SERVER_FLOAT_TYPE, CLIENT_DATA_TYPE) def federated_train(model, learning_rate, data): return tff.federated_mean( tff.federated_map(local_train, [ tff.federated_broadcast(model), tff.federated_broadcast(learning_rate), data ])) ''' # implement model = initial_model learning_rate = 0.1 for round_num in range(5): # 每一轮,把大家的模型分别更新一下,取平均之后拿回来(做赋值替换) model = federated_train(model, learning_rate, federated_train_data) # 把学习率减小一点 learning_rate = learning_rate * 0.9 # 算个loss输出一下 loss = federated_eval(model, federated_train_data) print('round {}, loss={}'.format(round_num, loss)) '''
最后
tff.federated_mean
对MODEL_TYPE
类型进行了平均。
例子总结
这个例子有点奇奇怪怪的,比如它的数据的分割方式,是client
只有一种类型图片的batch,最后联邦学习average的竟然是从不同数据学习到的model
,这和联邦学习有点不一样。之后改进的话
- 数据分割方式。应该改是
clients->batchs
- 训练方式。应该是先定义
batch
再到一个client``batches
的完整训练,然后在调用tff.federated_map
、tff.federated_mean
进行聚合。
编写算法的时候也可以借鉴它的流程,首先完成对batch的优化,然后对client的优化,然后对所有的client的结果聚合。最近自己也会编一个类似的代码(可能不是基于tff的),尝试一下。