Issue
I'm trying to run the multi-worker MNIST CPU training (MultiWorkerMirroredStrategy) from the tensorflow website using Tensorflow 2.9.1 with python 3.9. Three workers are launched on the same host. Worker 1,2 terminate without error but Worker 0 reports the error:
UNAVAILABLE: Error reported from /job:worker/task:2: Task /job:worker/replica:0/task:2 heartbeat timeout. This indicates that the remote task has failed, got preempted, or crashed unexpectedly. [type.googleapis.com/tensorflow.CoordinationServiceError='"\n\n\x06worker\x10\x02']
Code tf_mnist_multi_worker.py:
import json
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
tf_config = json.loads(os.environ['TF_CONFIG'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
PER_WORKER_BATCH_SIZE = 64
global_batch_size = PER_WORKER_BATCH_SIZE * len(tf_config['cluster']['worker'])
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
multi_worker_dataset = mnist_dataset(global_batch_size)
multi_worker_dataset = multi_worker_dataset.with_options(options)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
if tf_config['task']['index'] == 0:
multi_worker_model.save(".")
In three separate terminals the following three command sequences are executed:
Linux Terminal1 1:
export TF_CONFIG='{"cluster": {"worker": ["localhost:7000", "localhost:7001", "localhost:7002"]}, "task": {"type": "worker", "index": 0}}'
python tf_mnist_multi_worker.py
Linux Terminal 2:
export TF_CONFIG='{"cluster": {"worker": ["localhost:7000", "localhost:7001", "localhost:7002"]}, "task": {"type": "worker", "index": 1}}'
python tf_mnist_multi_worker.py
Linux Terminal 3:
export TF_CONFIG='{"cluster": {"worker": ["localhost:7000", "localhost:7001", "localhost:7002"]}, "task": {"type": "worker", "index": 2}}'
python tf_mnist_multi_worker.py
Full error log of worker 0:
2022-07-05 08:38:52.253059: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-07-05 08:38:52.253115: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2022-07-05 08:38:53.656584: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-07-05 08:38:53.656637: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2022-07-05 08:38:53.656668: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (blipp65.sdp.research.bell-labs.com): /proc/driver/nvidia/version does not exist
2022-07-05 08:38:53.657282: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-07-05 08:38:53.667990: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:7000, 1 -> localhost:7001, 2 -> localhost:7002}
2022-07-05 08:38:53.668098: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:7000, 1 -> localhost:7001, 2 -> localhost:7002}
2022-07-05 08:38:53.668800: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:438] Started server with target: grpc://localhost:7000
2022-07-05 08:38:55.567814: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 8s 76ms/step - loss: 2.2734 - accuracy: 0.1854
Epoch 2/3
70/70 [==============================] - 5s 73ms/step - loss: 2.2141 - accuracy: 0.3222
Epoch 3/3
70/70 [==============================] - 5s 71ms/step - loss: 2.1473 - accuracy: 0.4274
WARNING:absl:Found untraced functions such as _jit_compiled_convolution_op while saving (showing 1 of 1). These functions will not be directly callable after loading.
2022-07-05 08:39:24.672522: E tensorflow/core/common_runtime/base_collective_executor.cc:249] BaseCollectiveExecutor::StartAbort UNAVAILABLE: Error reported from /job:worker/task:1: Task /job:worker/replica:0/task:1 heartbeat timeout. This indicates that the remote task has failed, got preempted, or crashed unexpectedly. [type.googleapis.com/tensorflow.CoordinationServiceError='\"\n\n\x06worker\x10\x01']
2022-07-05 08:39:24.674058: E tensorflow/core/common_runtime/ring_alg.cc:290] Aborting RingReduce with UNAVAILABLE: Collective ops is aborted by: Error reported from /job:worker/task:1: Task /job:worker/replica:0/task:1 heartbeat timeout. This indicates that the remote task has failed, got preempted, or crashed unexpectedly.
The error could be from a previous operation. Restart your program to reset. [type.googleapis.com/tensorflow.DerivedStatus='']
Traceback (most recent call last):
File "./tf_mnist_multi_local/tf_mnist_multi_worker.py", line 59, in <module>
multi_worker_model.save(".")
File "./miniconda3/envs/tfmnist/lib/python3.9/site-packages/keras/utils/traceback_utils.py", line 67, in error_handler
raise e.with_traceback(filtered_tb) from None
File "./miniconda3/envs/tfmnist/lib/python3.9/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "./miniconda3/envs/tfmnist/lib/python3.9/multiprocessing/pool.py", line 771, in get
raise self._value
File "./miniconda3/envs/tfmnist/lib/python3.9/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "./miniconda3/envs/tfmnist/lib/python3.9/multiprocessing/pool.py", line 48, in mapstar
return list(map(*args))
tensorflow.python.framework.errors_impl.UnavailableError: Collective ops is aborted by: Error reported from /job:worker/task:1: Task /job:worker/replica:0/task:1 heartbeat timeout. This indicates that the remote task has failed, got preempted, or crashed unexpectedly.
The error could be from a previous operation. Restart your program to reset. [Op:CollectiveReduceV2]
Any idea what may be wrong?
Solution
It turns out that multi_worker_model.save()
must be called in all workers even if the data of the non-zero workers is not required.
Here the full working code (notice the last 2 lines, which were added):
import json
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
tf_config = json.loads(os.environ['TF_CONFIG'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
PER_WORKER_BATCH_SIZE = 64
global_batch_size = PER_WORKER_BATCH_SIZE * len(tf_config['cluster']['worker'])
with strategy.scope():
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
multi_worker_dataset = mnist_dataset(global_batch_size)
multi_worker_dataset = multi_worker_dataset.with_options(options)
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = build_and_compile_cnn_model()
#multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
if tf_config['task']['index'] == 0:
multi_worker_model.save(".")
else:
multi_worker_model.save(f"./workertmp{tf_config['task']['index']}")
Answered By - SJ1928
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.