Issue
I have created a custom encoder/decoder like so:
import tensorflow as tf
from tensorflow_model_optimization.python.core.internal import tensor_encoding as te
# noinspection PyUnresolvedReferences
class SparseTernaryCompressionEncodingStage(te.core.EncodingStageInterface):
AVERAGE = 'average'
NEGATIVES = 'negatives'
POSITIVES = 'positives'
TESTING = 'testing'
NEW_SHAPE = 'new_shape'
ORIGINAL_SHAPE = 'original_shape'
def name(self):
pass
def compressible_tensors_keys(self):
pass
def commutes_with_sum(self):
pass
def decode_needs_input_shape(self):
pass
def get_params(self):
pass
def encode(self, original_tensor, encode_params):
original_shape = tf.shape(original_tensor)
tensor = tf.reshape(original_tensor, [-1])
sparsification_rate = int(len(tensor) / 100 * 1)
new_shape = tensor.get_shape().as_list()
if sparsification_rate == 0:
sparsification_rate = 1
mask = tf.cast(tf.abs(tensor) >= tf.math.top_k(tf.abs(tensor), sparsification_rate)[0][-1], tf.float32)
inv_mask = tf.cast(tf.abs(tensor) < tf.math.top_k(tf.abs(tensor), sparsification_rate)[0][-1], tf.float32)
tensor_masked = tf.multiply(tensor, mask)
average = tf.reduce_sum(tf.abs(tensor_masked)) / sparsification_rate
compressed_tensor = tf.add(tf.multiply(average, mask) * tf.sign(tensor), tf.multiply(tensor_masked, inv_mask))
negatives = tf.where(compressed_tensor < 0)
positives = tf.where(compressed_tensor > 0)
encoded_x = {self.AVERAGE: average, self.NEGATIVES: negatives, self.POSITIVES: positives,
self.NEW_SHAPE: new_shape, self.ORIGINAL_SHAPE: original_shape}
return encoded_x
def decode(self, encoded_tensors, decode_params, num_summands=None, shape=None):
decompressed_tensor = tf.zeros(self.NEW_SHAPE, tf.float32)
average_values_negative = tf.fill([len(self.NEGATIVES), ], -self.AVERAGE)
average_values_positive = tf.fill([len(self.POSITIVES), ], self.AVERAGE)
decompressed_tensor = tf.tensor_scatter_nd_update(decompressed_tensor, self.NEGATIVES, average_values_negative)
decompressed_tensor = tf.tensor_scatter_nd_update(decompressed_tensor, self.POSITIVES, average_values_positive)
decompressed_tensor = tf.reshape(decompressed_tensor, self.ORIGINAL_SHAPE)
return decompressed_tensor
Now, i would like to use the encode function to encode all the weights that the client send to the server and, on the server, use the decode function to be able to obtain all the weights back. Basically, instead of sending all the weights from the client to the server, i want to send only some necessaries information that will let me able to create the weights back from only 5 informations.
The problem is that i don't understand how to tell the client to use this encoder to send the information and to the server to use the decoder before trying to do:
round_model_delta = tff.federated_mean(client_outputs.weights_delta, weight=weight_denom)
I'm using Tensorflow Federated simple_fedavg as basic project.
Solution
If you only want to modify the aggregation, you may have easier time using the tff.learning
APIs with what you have, parameterizing the aggregation with a tff.aggregators
object. For instance:
te.core.EncoderComposer(te.testing.PlusOneOverNEncodingStage()).make()
def encoder_fn(value_spec):
return te.encoders.as_gather_encoder(
te.core.EncoderComposer(SparseTernaryCompressionEncodingStage()).make(),
value_spec)
tff.learning.build_federated_averaging_process(
..., # Other args.
model_update_aggregation_factory=tff.aggregators.EncodedSumFactory(
encoder_fn))
You may also find these tutorials helpful:
- https://www.tensorflow.org/federated/tutorials/tuning_recommended_aggregators
- https://www.tensorflow.org/federated/tutorials/custom_aggregators
Answered By - Jakub Konecny
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.