If you are someone using AWS DynamoDB for your application and looking for ways to speed the query drastically then you could possibly arrive at AWS Elasticsearch. So you will have to migrate your existing DynamoDB data to Elasticsearch as well as keep both the databases in sync. That is exactly what I am going to share with you in this article.
The code for this project can also be found in my GitHub repository. I have migrated tables that contain almost 10 million records using this method.
High-level overview
- We are going to use the scan API in AWS DynamoDB for getting the entire table using paging.
- Also, bulk API will be used on Elasticsearch end for uploading the data in batches.
- In case we face any error when uploading data to Elasticsearch, we save that response in S3 bucket for further analysis.
- For syncing the real time data we will make use of the dynamoDB streams which allows us to upload the Elasticsearch data whenever there is a change in dynamoDB using a lambda function.
Prerequisites
- DynamoDB table that needs to be migrated to Elasticsearch
- A Elasticsearch index that matches the dynamoDB table.
Data Sync
Before migrating the existing data you need to set up the data sync in order to keep the Elasticsearch database updated during migration. In order to use the requests-aws4auth library, you can make use of the layer which I have here. Download it and add it to your layer.
import json
import boto3
from requests_aws4auth import AWS4Auth
import logging
import sys
from boto3.dynamodb.types import TypeDeserializer
import decimal
import os
import uuid
deserializer = TypeDeserializer()
sys.path.append('/opt/python/requests')
import requests
# Get logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
my_session = boto3.session.Session()
region = my_session.region_name
service = "es"
credentials = my_session.get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
# Get url from environment variables
host = os.environ['HOST_NAME']
headers = { "Content-Type": "application/json" }
bulkUrl = host + "/_bulk"
tableIndexName = "INDEX-NAME"
tableUniqueKey = "guid"
upsertDataList = None
deleteDataList = None
listOfIdsToDelete = None
class DecimalEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, decimal.Decimal):
if o % 1 == 0:
return int(o)
else:
return float(o)
return super(DecimalEncoder, self).default(o)
def lambda_handler(event, context):
# logger.info(event)
try:
upsertDataList = []
deleteDataList = []
logger.info("No of records : {0}".format(len(event['Records'])))
for record in event['Records']:
if record['eventName'] == "INSERT" or record['eventName'] == "MODIFY":
upsertDataList.append(record)
elif record['eventName'] == "REMOVE":
deleteDataList.append(record)
if len(upsertDataList) > 0:
handleUpsert(upsertDataList)
if len(deleteDataList) > 0:
handleDelete(deleteDataList)
except Exception as e:
error = str(e) +" Error in lambda_handler(). Event : {0}".format(event)
logger.error(e)
def handleUpsert(ListOfRecords):
logger.info("inside handleUpsert()")
document = ""
for record in ListOfRecords:
newImage = record["dynamodb"]["NewImage"]
# logger.info(newImage)
deserialised = {k: deserializer.deserialize(v) for k, v in newImage.items()}
indexName = None
uniqueKey = None
if tableIndexName in record["eventSourceARN"]:
indexName = tableIndexName
uniqueKey = tableUniqueKey
else:
error = "Table does not have index in elasticsearch"
logger.error(error)
raise error
logger.info("indexName - {0} , uniqueKey - {1}".format(indexName,uniqueKey))
if record['eventName'] == "MODIFY":
logger.info("Update event")
# logger.info("Old Data : " + str(record["dynamodb"]["OldImage"]))
else:
logger.info("Insert event")
header = {
"index": {
"_index": indexName,
"_id": str(deserialised[uniqueKey]).replace('"',"").replace("'","")
}
}
document += json.dumps(header)+"\n"+json.dumps(deserialised,cls=DecimalEncoder)+"\n"
# uploading data to elasticsearch
uploadToElasticSearch(document)
def handleDelete(ListOfRecords):
logger.info("handling delete event")
listOfIdsToDelete = []
for record in ListOfRecords:
OldImage = record["dynamodb"]["OldImage"]
# logger.info(OldImage)
if tableIndexName in record["eventSourceARN"]:
deserialised = {k: deserializer.deserialize(v) for k, v in OldImage.items()}
indexName = tableIndexName
uniqueId = str(deserialised[tableUniqueKey]).replace('"',"").replace("'","")
listOfIdsToDelete.append(uniqueId)
else:
error = "Table does not have index in elasticsearch"
logger.error(error)
raise error
# delete data from elasticsearch
if len(listOfIdsToDelete) > 0:
deleteFromElasticSearch(tableIndexName, tableUniqueKey, listOfIdsToDelete)
def uploadToElasticSearch(document):
logger.info("Inside uploadToElasticSearch()")
try:
response = requests.post(bulkUrl,auth=awsauth, data = document,headers = headers)
if json.loads(response.text)['errors'] == True:
logger.error("Error in uploading document")
raise str(response.text)
else:
logger.info("Successfully uploaded data")
except Exception as e:
error = "Error in uploadToElasticSearch() - " + str(e)
logger.error(error)
def deleteFromElasticSearch(indexName, uniqueKey,ids):
logger.info("Inside deleteFromElasticSearch()")
if type(ids[0]) == str:
uniqueKey = uniqueKey + str(".keyword")
try:
payload = {
"query": {
"terms": {
uniqueKey : ids
}
}
}
response = requests.post(host + "/" + indexName + "/_delete_by_query", auth=awsauth,data=json.dumps(payload), headers=headers)
if 'failures' in json.loads(response.text) and len(json.loads(response.text)['failures']) > 0:
logger.error("Error in deleting document")
logger.error(response.text)
else:
logger.info("Successfully Deleted data")
except Exception as e:
logger.error("Error in deleteFromElasticSearch() - " + str(e))
You can also copy the source code from this GitHub link. Create a lambda function using this code and make sure to replace the index name, hostname, and table unique key.
- Go to the dynamoDB table and navigate to the “Exports and streams” tab.
- Enable the dynamoDB streams.
- Scroll down to the “Trigger” section and click create trigger. Add the lambda function that you created.
Data Migration
Since data migration is going to be a one-time process this can be done in two ways.
- Executing python script through the local machine
- Executing python script through AWS lambda (along with the help of SQS to overcome 15 mins time restriction in lambda)
1. Executing python script through the local machine
import json
import boto3
from boto3.dynamodb.conditions import Key, Attr
import decimal
from requests_aws4auth import AWS4Auth
import logging
import sys
import datetime
sys.path.append('/opt/python/requests')
import requests
# Get logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
my_session = boto3.session.Session()
region = my_session.region_name
service = "es"
credentials = my_session.get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
headers = { "Content-Type": "application/json" }
host = "https://YOUR_ENDPOINT.ap-south-1.es.amazonaws.com"
bulkUrl = host + "/_bulk"
fromDate = None
toDate = None
tableName = None
# for error handling
fileSuffix = 0
s3 = boto3.resource('s3')
bucketName = "BUCKET_NAME"
currentDate = datetime.datetime.today().strftime('%Y-%m-%d')
filesWithError = []
def lambda_handler(event, context):
response = None
fileSuffix = 0
tableName = "DEMO-TABLE"
uniqueId = "guid"
index_name = "DEMO-TABLE"
dateFieldName = "businessDate"
table = boto3.resource('dynamodb',region_name = "ap-south-1").Table(tableName)
# fromDate = "2020-10-01" #uncomment this line if you want to filter by date
# toDate = "2020-11-30" #uncomment this line if you want to filter by date
while True:
document = ""
if not response:
# Scan from the start.
if fromDate is None:
response = table.scan()
else:
response = table.scan(FilterExpression=Attr(dateFieldName).gte(fromDate) & Attr(dateFieldName).lte(toDate))
else:
# Scan from where you stopped previously.
if fromDate is None:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
else:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], FilterExpression=Attr(dateFieldName).gte(fromDate) & Attr(dateFieldName).lte(toDate))
if len(response["Items"]) > 0:
for item in response["Items"]:
header = {
"index": {
"_index": index_name,
"_id": str(item[uniqueId]).replace('"',"").replace("'","")
}
}
document += json.dumps(header)+"\n"+json.dumps(item,cls=DecimalEncoder)+"\n"
# break
esResponse = requests.post(bulkUrl,auth=awsauth, data = document,headers = headers)
# print(esResponse.text)
if json.loads(esResponse.text)['errors'] == True:
logger.info("Error in uploading document")
# neglect the below lines if you want to store the error files on S3
fileSuffix += 1
out_file = "es-sync-error/{0}/{1}/{0}_{2}.json".format(tableName,currentDate,fileSuffix)
s3.Object(bucketName, out_file).put(Body = esResponse.text)
filesWithError.append(out_file)
else:
print("Successfully uploaded data")
break
# Stop the loop if no additional records are
# available.
if 'LastEvaluatedKey' not in response:
break
if len(filesWithError) > 0:
return filesWithError
else:
return "Successfully Uploaded"
# to handle decimal fields from dynamodb
class DecimalEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, decimal.Decimal):
if o % 1 == 0:
return int(o)
else:
return float(o)
return super(DecimalEncoder, self).default(o)
# remove the below lines if you are going to deploy it on AWS lambda
startTime = datetime.datetime.now()
lambda_handler("","")
endTime = datetime.datetime.now()
print("Time taken - {0}".format(endTime - startTime))
You can also copy the source code from this GitHub link.
- Install dependencies like boto3, requests_aws4auth using the following pip commands
pip install boto3
pip install requests-aws4auth
- Open command prompt and type “aws configure” to set up your AWS credentials.
- Replace host name, table name and index name from the code and you are ready to start the migration.
2. Executing python script through AWS lambda
Since lambda has a timeout of 15 mins we can make use of SQS by adding a message to it (contains a continuous key) before lambda timeouts. I recommend ending the current lambda instance (and adding the message to SQS) when the remaining time is less than 2 mins.
In order to use the requests-aws4auth library, you can make use of the layer which I have here. Download it and add it to your layer.
import json
import boto3
from boto3.dynamodb.conditions import Key, Attr
import decimal
from decimal import *
from requests_aws4auth import AWS4Auth
import logging
import sys
import datetime
import os
import uuid
sys.path.append('/opt/python/requests')
import requests
# Get logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
tableName = None
uniqueId = None
currentDate = datetime.datetime.today().strftime('%Y-%m-%d')
index_name = None
dateFieldName = None
my_session = boto3.session.Session()
region = my_session.region_name
service = "es"
credentials = my_session.get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
headers = { "Content-Type": "application/json" }
# Get url from environment variables
timeOut = int(os.environ['TIME_OUT'])
sqsUrl = os.environ['SQS_URL']
bucketName = os.environ['BUCKET_NAME']
host = os.environ['HOST_NAME']
tableName = os.environ['TABLE_NAME']
uniqueId = "guid"
index_name = os.environ['INDEX_NAME']
bulkUrl = host + "/_bulk"
table = boto3.resource('dynamodb').Table(tableName)
# for error handling
filePrefix = 0
s3 = boto3.resource('s3')
# sqsClient = session.client(service_name='sqs', endpoint_url='https://sqs.us-east-1.amazonaws.com') #if lambda inside vpc
sqsClient = boto3.client('sqs')
session = boto3.Session()
currentDate = datetime.datetime.today().strftime('%Y-%m-%d')
filesWithError = []
def lambda_handler(event, context):
try:
logger.info(event)
if ("Records" in event) and (len(event["Records"]) > 0):
logger.info("Trigger through SQS.")
for record in event["Records"]:
event = eval(record["body"])
else:
logger.info("Triggered manually.")
segmentNumber = int(event["segmentNumber"])
if 'filePrefix' in event:
filePrefix = int(event["filePrefix"])
else:
filePrefix = 0
response = None
lastEvaluatedKey = None
fileSuffixFieldName = uniqueId
if "lastEvaluatedKey" in event:
response = {}
response['LastEvaluatedKey'] = eval(json.loads(json.dumps(str(event["lastEvaluatedKey"]))))
s3Folder = "es-sync/" + tableName + "/" + currentDate + "/"
isDataPresent = True
while isDataPresent:
document = ""
out_file = ""
if not response:
# Scan from the start.
response = table.scan()
else:
# Scan from where you stopped previously.
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
if 'LastEvaluatedKey' in response:
logger.info("Last evaluated key - {0}, Count - {1}".format(response['LastEvaluatedKey'],response['Count']))
lastEvaluatedKey = response['LastEvaluatedKey']
else:
lastEvaluatedKey = "FINAL SET OF RECORDS"
if len(response["Items"]) > 0:
for item in response["Items"]:
header = {
"index": {
"_index": index_name,
"_id": str(item[uniqueId]).replace('"',"").replace("'","")
}
}
document += json.dumps(header)+"\n"+json.dumps(item,cls=DecimalEncoder)+"\n"
esResponse = requests.post(bulkUrl,auth=awsauth, data = document,headers = headers)
filePrefix += 1
filePrefixString = getPrefixString(filePrefix) #getting a four digit string
if 'LastEvaluatedKey' in response:
out_file = '{0}{1}_{2}.json'.format(s3Folder, filePrefixString, response['LastEvaluatedKey'][fileSuffixFieldName])
else:
out_file = '{0}{1}_{2}.json'.format(s3Folder, filePrefixString, "finalRecord")
# adding record in s3 bucket
s3.Object(bucketName, out_file).put(Body = document)
if json.loads(esResponse.text)['errors'] == True:
logger.error("Error in uploading document")
if 'LastEvaluatedKey' in response:
out_file = 'es-sync-error/{0}/{1}/{2}_{3}.json'.format(tableName,currentDate, filePrefixString, response['LastEvaluatedKey'][fileSuffixFieldName])
else:
out_file = 'es-sync-error/{0}/{1}/{2}_{3}.json'.format(tableName, currentDate, filePrefixString, "finalRecord")
s3.Object(bucketName, out_file).put(Body = esResponse.text)
filesWithError.append(out_file)
error = "Error while uploading data to elasticsearch. Error File upload to s3 in {0}. Event - {1}".format(out_file, event)
logger.error(error)
else:
logger.info("Successfully uploaded data")
# break
# get remaining processing time
remainingTime = context.get_remaining_time_in_millis()
logger.info("Remaining time - {0} ms ".format(remainingTime))
if remainingTime < timeOut:
logger.info("Lambda remaining time is less than 2 mins.So adding lastEvaluatedKey in sqs and ending this lambda.")
segmentNumber += 1
payload = {
"tableName" : tableName,
"lastEvaluatedKey" : lastEvaluatedKey,
"segmentNumber" : segmentNumber,
"filePrefix" : filePrefix
}
response = sqsClient.send_message(QueueUrl = sqsUrl, MessageBody=json.dumps(payload,cls=DecimalEncoder))
logger.info("lastEvaluatedKey added in sqs.")
isDataPresent = False
break
# Stop the loop if no additional records are
# available.
if 'LastEvaluatedKey' not in response:
logger.info("No more records are present.")
isDataPresent = False
break
else:
logger.info("Last evaluated key - {0}".format(response['LastEvaluatedKey']))
del document
del out_file
if len(filesWithError) > 0:
logger.info("File with error - {0}".format(filesWithError))
return filesWithError
else:
logger.info("Successfully Uploaded without error in elasticsearch bulk upload.")
return "Successfully Uploaded"
except Exception as e:
error = str(e) +" Error in lambda_handler(). Event : {0}".format(event)
logger.error(e)
def getPrefixString(filePrefix):
if filePrefix < 10:
filePrefixString = "0000" + str(filePrefix)
elif filePrefix < 100:
filePrefixString = "000" + str(filePrefix)
elif filePrefix < 1000:
filePrefixString = "00" + str(filePrefix)
else:
filePrefixString = "0" + str(filePrefix)
return filePrefixString
# to handle decimal fields from dynamodb
class DecimalEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, decimal.Decimal):
if o % 1 == 0:
return int(o)
else:
return float(o)
return super(DecimalEncoder, self).default(o)
You can also copy the source code using this Github link.
Happy Programming!!
1 thought on “ETL – Complete guide to migrate AWS DynamoDB to AWS Elasticsearch”