Вот и возникла такая необходимость по работе. Еще даже до начала процесса разработки самой лямбды сопутствующие вопросы по запуску этого всего в облаке даже немного подпугивали 😉
Читать далееСразу скажу, что для данной статьи я буду повторять все действия, даже просто для самого себя, что бы потом оно получилось уже быстрее. Наверное самый большой камень преткновения стал бы вопрос с настройкой прав доступа роли лямбды которая создается в момент её создания.
Для большего удобства я удалил все данные из аккаунта и выходит как-бы с нуля …
1. Создаем Lambda Function
Механика данного процесса предполагает наличие в выходных данных некоторой структуры, которая далее будет перехватываться Kinesis Firehose и далее он самостоятельно направит данные в Delivery Stream. Но обо всем по порядку
Переходим к AWS Lambda Functions
Жмем кнопку
Настройки устанавливаем такие:
Сразу после того, как процесс создания окончится будет открыт файл lambda_function.py содержимое которого мы меняем на следующее:
"""
@Author: Ihor
@Date: 10.11.2022
@Version: 1.00
Sample LambdaFunction to stream data from Kinesis through Lambda function to S3 Bucket with Partitioned Keys
"""
import base64
import datetime
import json
def handle_data(data: dict, table_name: str, record_id: str) -> dict:
return store_object({
'tableName': table_name,
'recordId': record_id,
'data': data
})
def store_object(r_data: dict) -> dict:
_created_at_asis = r_data['data']['timestamp'] * 1000
_event_timestamp = datetime.datetime.utcfromtimestamp(
_created_at_asis / 1000
).replace(
microsecond=_created_at_asis % 1000
)
_partition_keys = {
"tableName": r_data['tableName'],
"year": _event_timestamp.strftime('%Y'),
"month": _event_timestamp.strftime('%m'),
"day": _event_timestamp.strftime('%d'),
"hour": _event_timestamp.strftime('%H')
}
_firehose_record_output = {'recordId': r_data['recordId'],
'tableName': r_data['tableName'],
'data': base64.b64encode(
json.dumps(
r_data['data']
).encode('utf-8')
).decode('utf-8'),
'result': 'Ok',
'metadata': {'partitionKeys': _partition_keys}}
return _firehose_record_output
def lambda_handler(event, context: None):
__kinesis_output_records = {"records": []}
if 'Records' in event or 'records' in event:
for _event in event['Records' if 'Records' in event else 'records']:
_data = _event['Data'] if 'Data' in _event else _event['data']
_data = json.loads(base64.b64decode(_data, validate=True))
__kinesis_output_records['records'].append(
handle_data(data=_data,
table_name=_event['tableName'],
record_id=_event['recordId'])
)
print(__kinesis_output_records)
return __kinesis_output_records
else:
return {
'status': 400,
'text': 'Malformed data on input. Input data from Kinesis will be a list of records,'
'such as {"records":[{obj},{obj},.....]}'
}
Ну и да, жмем — Deploy
Теперь настроим тестовые данные и попробуем что бы все запускалось без ошибок. Для этого перейдем во вкладку Test
Далее копируем следующее:
{"records": [
{
"recordId": "49634073611871172072718215979886834075896782645243150338000000",
"approximateArrivalTimestamp": 1667837631848,
"data": "ewoJIkFwcHJveGltYXRlQ3JlYXRpb25EYXRlVGltZSI6IDE2Njc1NzY5OTksCgkib2JqZWN0IjogInBvc3QiLAogICAgICAgICJhY3Rpb24iOiAiY3JlYXRlIiwKCSJwb3N0X2lkIjogImNhZDg4MDFmLTM3Z2ItMzI2MC16MTkyLTFhYjBhMjRmM2ExNyIsCgkidGltZXN0YW1wIjogMTY2NzU3Njk5OSwKCSJ1c2VyIjogIkJpc2giLAoJIlNpemVCeXRlcyI6IDM1Cn0=",
"tableName": "links",
"kinesisRecordMetadata": {
"sequenceNumber": "49634673611871172078818215979886834075896733645243150338",
"subsequenceNumber": 0,
"partitionKey": "E9512A4081B278E3F4030ABCEC1B52CA",
"shardId": "shardId-000000000000",
"approximateArrivalTimestamp": 1667837631848
}
},
{
"recordId": "49634073611871172072718215979886834075896782645243150338000001",
"approximateArrivalTimestamp": 1667837731848,
"data": "ewoJIkFwcHJveGltYXRlQ3JlYXRpb25EYXRlVGltZSI6IDE2Njc1NzY5MzgsCgkib2JqZWN0IjogImxpbmsiLAogICAgICAgICJhY3Rpb24iOiAiY3JlYXRlIiwKCSJ0aW1lc3RhbXAiOiAxNjY3NTc2OTk5LAoJInVzZXIiOiAiQmlzaCIsCgkiU2l6ZUJ5dGVzIjogNTkKfQ==",
"tableName": "links",
"kinesisRecordMetadata": {
"sequenceNumber": "49634073611871172072718215979886834075896782645243150338",
"subsequenceNumber": 0,
"partitionKey": "E9512A4081B278E3F4030ABCEC1B52CA",
"shardId": "shardId-000000000000",
"approximateArrivalTimestamp": 1667837831848
}
},
{
"recordId": "49634073611871172072718215979886834075896782645243150338000002",
"approximateArrivalTimestamp": 1667837931848,
"data": "ewoJIkFwcHJveGltYXRlQ3JlYXRpb25EYXRlVGltZSI6IDE2Njc1NzY5OTksCgkib2JqZWN0IjogInBvc3QiLAogICAgICAgICJhY3Rpb24iOiJjcmVhdGUiLAoJInBvc3RfaWQiOiAiY2FkODgwMWYtMzdnYi0zMjYwLXoxOTItMWFiMGEyNGYzYTE3IiwKCSJ0aXRsZSI6ICJIZWxsbyB3b3JsZCEiLAoJInRleHQiOiAiVGV4dCBCb2R5IG9mIG91ciBwb3N0IiwKCSJ0aW1lc3RhbXAiOiAxNjY3NTc2OTk5LAoJInVzZXIiOiAiQmlzaCIsCgkiU2l6ZUJ5dGVzIjogNjUKfQ==",
"tableName": "posts",
"kinesisRecordMetadata": {
"sequenceNumber": "49634073611871172072718215979886834075896782645243150338",
"subsequenceNumber": 0,
"partitionKey": "E9512A4081B278E3F4030ABCEC1B52CA",
"shardId": "shardId-000000000000",
"approximateArrivalTimestamp": 1667839931848
}
},
{
"recordId": "49634073611871172072718215979886834075896782645243150338000003",
"approximateArrivalTimestamp": 1667937631848,
"data": "ewoJIkFwcHJveGltYXRlQ3JlYXRpb25EYXRlVGltZSI6IDE2Njc1NzY5OTksCgkib2JqZWN0IjogInBvc3QiLAogICAgICAgICJhY3Rpb24iOiJtb2RpZnkiLAoJInBvc3RfaWQiOiAiY2FkODgwMWYtMzdnYi0zMjYwLXoxOTItMWFiMGEyNGYzYTE3IiwKCSJuZXdfdGl0bGUiOiAiQmVzdCBQb3N0IEV2ZXIhIiwKCSJuZXdfdGV4dCI6ICJCZXN0IHRleHQgZm9yIGJlc3QgUE9TVCEiLAoJInRpbWVzdGFtcCI6IDE2Njc1Nzk5OTksCgkidXNlciI6ICJCaXNoIiwKCSJTaXplQnl0ZXMiOiAxMTEKfQ==",
"tableName": "posts",
"kinesisRecordMetadata": {
"sequenceNumber": "49634073611871172072718215979886834075896782645243150338",
"subsequenceNumber": 0,
"partitionKey": "E9512A4081B278E3F4030ABCEC1B52CA",
"shardId": "shardId-000000000000",
"approximateArrivalTimestamp": 1667837631848
}
},
{
"recordId": "49634073611871172072718215979886834075896782645243150338000004",
"approximateArrivalTimestamp": 1667837631848,
"data": "ewoJIkFwcHJveGltYXRlQ3JlYXRpb25EYXRlVGltZSI6IDE2Njc1NzY5OTksCgkib2JqZWN0IjogInBvc3QiLAogICAgICAgICJhY3Rpb24iOiJkZWxldGUiLAoJInBvc3RfaWQiOiAiY2FkODgwMWYtMzdnYi0zMjYwLXoxOTItMWFiMGEyNGYzYTE3IiwKCSJ0aW1lc3RhbXAiOiAxNjY3NTc2OTk5LAoJInVzZXIiOiAiQmlzaCIsCgkiU2l6ZUJ5dGVzIjogMzUKfQ==",
"tableName": "posts",
"kinesisRecordMetadata": {
"sequenceNumber": "49634073611871172072718215979886834075896782645243150338",
"subsequenceNumber": 0,
"partitionKey": "E9512A4081B278E3F4030ABCEC1B52CA",
"shardId": "shardId-000000000000",
"approximateArrivalTimestamp": 1668837631848
}
}
]}
Вставляем все это в поле Event JSON, а заодно и имя не забываем дать нашему событию Event name
на данном этапе достаточно будет просто нажать Save
Вернемся обратно к коду, нажав на Code
Нажмем на кнопочку Test
И мы увидим ответ от функции. И шо наша хохма с тестовыми данными таки да — работает 😉
На этом этапе функция завершена, оставим ее в покое.
2. Создаем S3 Bucket
Для тех, кто в курсе этого процесса — можно пропустить это пункт, для остальных идем далее
Перемещаемся в AWS S3 Bucket
Жмем:
Но тут тоже все просто — зададим имя для корзины
И жмем
в самом конце страницы