Issue
Update All Documents
Background Information
I have a use case where I need to update all documents in my index. My source looks similar to the below:
{
'hits': [
{'_index': 'main-index-v2',
'_type': '_doc',
'_id': 'ID_xzeta4955029dhs82901',
'_score': 8.403202,
'_source': {'id': 'ID_xzeta4955029dhs82901',
'employee_ids': ['J98234', 'J28373', 'CH13561', 'J98823', 'J12294'],
'non_employee_ids': [],
'friends_id': ['G8667', 'J98923', 'J28373', 'H82739', 'J98823'],
'local_date': '2022/01/10',
'local': True,
...
}
I can easily search my index using the multi_match query, however this is for a single ID.
def create_multi_query(ids: str, fields: list=['employee_ids', 'non_employee_ids', 'friends_id']):
return {
"query": {
"multi_match": {
"query": f"{ids}",
"fields": fields,
"operator": "or"
}
}
}
hits = es.search(index='main-index-v2', body=create_multi_query('G8667'), scroll='2m')
I want to provide a dictionary and list of fields as parameters to update my index.
Example:
{'J1234': 'J2875', 'CH1234': 'J2879'}
The dictionary contains old_ids to new_ids. I want to update every field that has old ids.
My Solution (Thus far)
I have written a painless script to update the ids, however it requires a for loop for each field. What the script does is loop through each field, one by one. If the current item in the list matches our parameter 'fromId' we append to a list the 'toId', otherwise add the current item to the list and move on. We then set the field equal to the new list.
Painless Script example
def result = [];
for (def item: ctx._source.employee_ids)
{
if (item == params.fromId) {
result .add(params.toId)
}
else {
result .add(item)
}} ctx._source.employee_ids= result;
def resultF = [];
for (def item: ctx._source.friends_id)
{
if (item == params.fromId) {
resultF .add(params.toId)
}
else {
resultF .add(item)
}} ctx._source.friends_id = resultF ;
This is able to be executed via UpdateByQuery within the elasticsearch_dsl
library.
Example of the Update call.
def partial_update(es, items: dict):
assert es.ping() is True
tmp = []
for from_id, to_id in items.items():
result = execute_intermediate(from_id, to_id)
tmp.append(result)
return tmp
@retry((exceptions.ConflictError, exceptions.ConnectionError, exceptions.RequestError), value_type=dict, tries=3, delay=2, backoff=1)
def execute_intermediate(from_id, to_id):
from elasticsearch_dsl import UpdateByQuery
ubq = UpdateByQuery(
using=auth_es(),
doc_type='doc', index=settings.ES_WRITE_INDEX,
)
ubq = ubq.script(source=UPDATE_SCRIPT, lang='painless', params={'fromId': from_id, 'toId': to_id})
ubq = ubq.params(wait_for_completion=True)
res = ubq.execute().to_dict()
return res
Create an intermediate function to execute the update on the single ID, wrapping with a retry decorator.
Issues
Doing it this way requires me to loop through my dictionary one by one to perform the update.
If I want to increase the number of fields we want to update, I need to add a new for loop.
Questions
What is the best / most optimal solution to update all fields in source based on the above?
Is there a way to send a dictionary to find all the documents matching the keys, updating with the values in a single call?
Solution
There is no out-of-the-box solution for this.
One improvement to the existing painless script is to change the array in place, while using a map in params accompanied by a list of fields to update.
PUT /test_replace_id/
{
"mappings": {
"properties": {
"employee_ids":{
"type": "keyword"
}
}
}
}
POST /test_replace_id/_doc/1
{
"employee_ids": ["old1","old2"],
"frieds_id": "old1"
}
POST /test_replace_id/_update/1
{
"script": {
"source": """
for (t in params.targets){
if (ctx._source[t] instanceof List){
for (int j=0; j<ctx._source[t].length; j++){
if (params.map.containsKey(ctx._source[t][j])) {
ctx._source[t][j] = params.map.get(ctx._source[t][j])
}
}
}else{
if (params.map.containsKey(ctx._source[t])) {
ctx._source[t] = params.map.get(ctx._source[t])
}
}
}
""",
"params":{
"targets": ["employee_ids","frieds_id"],
"map": {"old1":"new1"}
}
}
}
GET /test_replace_id/_search
This allows for greater flexibility, and not requiring to iterate and update. We can now send the entire request at once.
@Tomo_M for the solution!
Answered By - Jenobi
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.