Issue
Typical EncoderDecoderModel that works on a Pre-coded Dataset
The code snippet snippet as below is frequently used to train an EncoderDecoderModel
from Huggingface's transformer library
from transformers import EncoderDecoderModel
from transformers import PreTrainedTokenizerFast
multibert = EncoderDecoderModel.from_encoder_decoder_pretrained(
"bert-base-multilingual-uncased", "bert-base-multilingual-uncased"
)
tokenizer = PreTrainedTokenizerFast.from_pretrained("bert-base-multilingual-uncased")
...
And a pre-processed/coded dataset can be used to train the model as such, when using the wmt14
dataset:
import datasets
train_data = datasets.load_dataset("wmt14", "de-en", split="train")
val_data = datasets.load_dataset("wmt14", "de-en", split="validation[:10%]")
from functools import partial
def process_data_to_model_inputs(batch, encoder_max_length=512, decoder_max_length=512, batch_size=2):
inputs = tokenizer([segment["en"] for segment in batch['translation']],
padding="max_length", truncation=True, max_length=encoder_max_length)
outputs = tokenizer([segment["de"] for segment in batch['translation']],
padding="max_length", truncation=True, max_length=encoder_max_length)
batch["input_ids"] = inputs.input_ids
batch["attention_mask"] = inputs.attention_mask
batch["decoder_input_ids"] = outputs.input_ids
batch["decoder_attention_mask"] = outputs.attention_mask
batch["labels"] = outputs.input_ids.copy()
# because BERT automatically shifts the labels, the labels correspond exactly to `decoder_input_ids`.
# We have to make sure that the PAD token is ignored
batch["labels"] = [[-100 if token == tokenizer.pad_token_id else token for token in labels] for labels in batch["labels"]]
return batch
def munge_dataset_to_pacify_bert(dataset, encoder_max_length=512, decoder_max_length=512, batch_size=2):
bert_wants_to_see = ["input_ids", "attention_mask", "decoder_input_ids",
"decoder_attention_mask", "labels"]
_process_data_to_model_inputs = partial(process_data_to_model_inputs,
encoder_max_length=encoder_max_length,
decoder_max_length=decoder_max_length,
batch_size=batch_size
)
dataset = dataset.map(_process_data_to_model_inputs,
batched=True,
batch_size=batch_size
)
dataset.set_format(type="torch", columns=bert_wants_to_see)
return dataset
train_data = munge_dataset_to_pacify_bert(train_data)
val_data = munge_dataset_to_pacify_bert(val_data)
Then the training can be done easily as such:
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments
# set training arguments - these params are not really tuned, feel free to change
training_args = Seq2SeqTrainingArguments(
output_dir="./",
evaluation_strategy="steps",
...
)
# instantiate trainer
trainer = Seq2SeqTrainer(
model=multibert,
tokenizer=tokenizer,
args=training_args,
train_dataset=train_data,
eval_dataset=val_data,
)
trainer.train()
A working example can be found on something like: https://www.kaggle.com/code/alvations/neural-plasticity-bert2bert-on-wmt14
However, parallel data used to train an EncoderDecoderModel usually exists as .txt
or .tsv
files, not a pre-coded dataset
Given a large .tsv
file (e.g. 1 billion lines), e.g.
hello world\tHallo Welt
how are you?\twie gehts?
...\t...
Step 1: we can convert into the parquet / pyarrow format, one can do something like:
import vaex # Using vaex
import sys
filename = "train.en-de.tsv"
df = vaex.from_csv(filename, sep="\t", header=None, names=["src", "trg"], convert=True, chunk_size=50_000_000)
df.export(f"{filename}.parquet")
Step 2: Then we will can read it into a Pyarrow table to fit into the datasets.Dataset
object and use the munge_dataset_to_pacify_bert()
as shown above, e.g
from datasets import Dataset, load_from_disk
import pyarrow as pa
_ds = Dataset(pa.compute.drop_null(pa.parquet.read_table('train.en-de.tsv.parquet')
_ds.save_to_disk('train.en-de.tsv.parquet.hfdataset')
_ds = load_from_disk('train.en-de.tsv.parquet.hfdataset')
train_data = munge_dataset_to_pacify_bert(_ds)
train_data.save_to_disk('train.en-de.tsv.parquet.hfdataset')
While the process above works well for small-ish dataset, e.g. 1-5 million lines of data, when the scale of the goes to 500 million to 1 billion, the last .save_to_disk()
function seems like it is runningf "forever" and the end is no where in sight.
Breaking down the steps in the munge_dataset_to_pacify_bert()
, there are 2 sub-functions:
dataset.map(_process_data_to_model_inputs, batched=True, batch_size=batch_size)
dataset.set_format(type="torch", columns=bert_wants_to_see)
For the .map()
process, it's possible to scale in parallel threads by specifying by
dataset.map(_process_data_to_model_inputs,
batched=True, batch_size=100,
num_proc=32 # num of parallel threads.
)
And when I tried to process with
num_proc=32
batch_size=100
The .map()
function finishes the processing of 500 million lines in 18 hours of compute time on Intel Xeon E5-2686 @ 2.3GHz with 32 processor cores, optimally.
But somehow the .map()
function created 32 temp .arrow
files and 128 tmp...
binary files. Seemingly the last save_to_disk
function has been running for more than 10+ hours and have not finished combining the temp files in parts to save the final HF Dataset to disk.
Given the above context, my questions in parts are:
Question (Part 1): When the mapping function ends and created the temp .arrow
and tmp...
files, is there a way to read these individually instead of try to save them into a final directory using the save_to_disk()
function?
Question (Part 2): Why is the save_to_disk()
function so slow after the mapping and how can the mapped processed data be saved in a faster manner?
Question (Part 3): Is there a way to avoid the .set_format()
function after the .map()
and make it part of the _process_data_to_model_inputs
function?
Solution
TL;DR
(Answer's credits goes to @lhoestq)
If you have a TSV file that looks like this:
hello world\tHallo Welt
how are you?\twie gehts?
...\t...
load the dataset as such:
# tatoeba-sentpairs.tsv is a pretty large file.
ds = load_dataset("csv", data_files="../input/tatoeba/tatoeba-sentpairs.tsv",
streaming=True, delimiter="\t", split="train")
In Long
Reason not to use parquet, run map functions and save the outputs:
- Loading a large dataset into parquet is already quite a feat, in the thread, see Step 1 in question, so lets avoid that
- Mapping the data into the BERT format, i.e.
munge_dataset_to_pacify_bert
is also quite expensive operation. If that is done for 1B lines and even if it's thread-parallelized, it will take hours to days to complete - The resulting tensors that are saved with
dataset.set_format(type="torch")
is massive, a ~50GB of tsv with 1B lines will easily become TBs of binaries.
Instead, use stream-style processing,
Huggingface datasets
supports it with stream=True
when defining the dataset:
ds = load_dataset("csv", data_files="../input/tatoeba/tatoeba-sentpairs.tsv",
streaming=True, delimiter="\t", split="train")
Answered By - alvas
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.