0

[DONE] Kinesis + Lambda function + S3

Вот и возникла такая необходимость по работе. Еще даже до начала процесса разработки самой лямбды сопутствующие вопросы по запуску этого всего в облаке даже немного подпугивали 😉

Читать далее

Сразу скажу, что для данной статьи я буду повторять все действия, даже просто для самого себя, что бы потом оно получилось уже быстрее. Наверное самый большой камень преткновения стал бы вопрос с настройкой прав доступа роли лямбды которая создается в момент её создания.

Для большего удобства я удалил все данные из аккаунта и выходит как-бы с нуля …

1. Создаем Lambda Function

Механика данного процесса предполагает наличие в выходных данных некоторой структуры, которая далее будет перехватываться Kinesis Firehose и далее он самостоятельно направит данные в Delivery Stream. Но обо всем по порядку

Переходим к AWS Lambda Functions

Жмем кнопку

Настройки устанавливаем такие:

Сразу после того, как процесс создания окончится будет открыт файл lambda_function.py содержимое которого мы меняем на следующее:

"""
@Author: Ihor
@Date: 10.11.2022
@Version: 1.00

Sample LambdaFunction to stream data from Kinesis through Lambda function to S3 Bucket with Partitioned Keys

"""

import base64
import datetime
import json


def handle_data(data: dict, table_name: str, record_id: str) -> dict:
    return store_object({
        'tableName': table_name,
        'recordId': record_id,
        'data': data
    })


def store_object(r_data: dict) -> dict:
    _created_at_asis = r_data['data']['timestamp'] * 1000
    _event_timestamp = datetime.datetime.utcfromtimestamp(
        _created_at_asis / 1000
    ).replace(
        microsecond=_created_at_asis % 1000
    )

    _partition_keys = {
        "tableName": r_data['tableName'],
        "year": _event_timestamp.strftime('%Y'),
        "month": _event_timestamp.strftime('%m'),
        "day": _event_timestamp.strftime('%d'),
        "hour": _event_timestamp.strftime('%H')
    }
    _firehose_record_output = {'recordId': r_data['recordId'],
                               'tableName': r_data['tableName'],
                               'data': base64.b64encode(
                                   json.dumps(
                                       r_data['data']
                                   ).encode('utf-8')
                               ).decode('utf-8'),
                               'result': 'Ok',
                               'metadata': {'partitionKeys': _partition_keys}}
    return _firehose_record_output


def lambda_handler(event, context: None):
    __kinesis_output_records = {"records": []}

    if 'Records' in event or 'records' in event:
        for _event in event['Records' if 'Records' in event else 'records']:

            _data = _event['Data'] if 'Data' in _event else _event['data']

            _data = json.loads(base64.b64decode(_data, validate=True))

            __kinesis_output_records['records'].append(
                handle_data(data=_data,
                            table_name=_event['tableName'],
                            record_id=_event['recordId'])
            )

        print(__kinesis_output_records)
        return __kinesis_output_records

    else:
        return {
            'status': 400,
            'text': 'Malformed data on input. Input data from Kinesis will be a list of records,'
                    'such as {"records":[{obj},{obj},.....]}'
        }

Ну и да, жмем — Deploy

Теперь настроим тестовые данные и попробуем что бы все запускалось без ошибок. Для этого перейдем во вкладку Test
Далее копируем следующее:

{"records": [
  {
      "recordId": "49634073611871172072718215979886834075896782645243150338000000",
      "approximateArrivalTimestamp": 1667837631848,
      "data": "ewoJIkFwcHJveGltYXRlQ3JlYXRpb25EYXRlVGltZSI6IDE2Njc1NzY5OTksCgkib2JqZWN0IjogInBvc3QiLAogICAgICAgICJhY3Rpb24iOiAiY3JlYXRlIiwKCSJwb3N0X2lkIjogImNhZDg4MDFmLTM3Z2ItMzI2MC16MTkyLTFhYjBhMjRmM2ExNyIsCgkidGltZXN0YW1wIjogMTY2NzU3Njk5OSwKCSJ1c2VyIjogIkJpc2giLAoJIlNpemVCeXRlcyI6IDM1Cn0=",
      "tableName": "links",
      "kinesisRecordMetadata": {
        "sequenceNumber": "49634673611871172078818215979886834075896733645243150338",
        "subsequenceNumber": 0,
        "partitionKey": "E9512A4081B278E3F4030ABCEC1B52CA",
        "shardId": "shardId-000000000000",
        "approximateArrivalTimestamp": 1667837631848
      }
    },
  {
      "recordId": "49634073611871172072718215979886834075896782645243150338000001",
      "approximateArrivalTimestamp": 1667837731848,
      "data": "ewoJIkFwcHJveGltYXRlQ3JlYXRpb25EYXRlVGltZSI6IDE2Njc1NzY5MzgsCgkib2JqZWN0IjogImxpbmsiLAogICAgICAgICJhY3Rpb24iOiAiY3JlYXRlIiwKCSJ0aW1lc3RhbXAiOiAxNjY3NTc2OTk5LAoJInVzZXIiOiAiQmlzaCIsCgkiU2l6ZUJ5dGVzIjogNTkKfQ==",
      "tableName": "links",
      "kinesisRecordMetadata": {
        "sequenceNumber": "49634073611871172072718215979886834075896782645243150338",
        "subsequenceNumber": 0,
        "partitionKey": "E9512A4081B278E3F4030ABCEC1B52CA",
        "shardId": "shardId-000000000000",
        "approximateArrivalTimestamp": 1667837831848
      }
    },
  {
      "recordId": "49634073611871172072718215979886834075896782645243150338000002",
      "approximateArrivalTimestamp": 1667837931848,
      "data": "ewoJIkFwcHJveGltYXRlQ3JlYXRpb25EYXRlVGltZSI6IDE2Njc1NzY5OTksCgkib2JqZWN0IjogInBvc3QiLAogICAgICAgICJhY3Rpb24iOiJjcmVhdGUiLAoJInBvc3RfaWQiOiAiY2FkODgwMWYtMzdnYi0zMjYwLXoxOTItMWFiMGEyNGYzYTE3IiwKCSJ0aXRsZSI6ICJIZWxsbyB3b3JsZCEiLAoJInRleHQiOiAiVGV4dCBCb2R5IG9mIG91ciBwb3N0IiwKCSJ0aW1lc3RhbXAiOiAxNjY3NTc2OTk5LAoJInVzZXIiOiAiQmlzaCIsCgkiU2l6ZUJ5dGVzIjogNjUKfQ==",
      "tableName": "posts",
      "kinesisRecordMetadata": {
        "sequenceNumber": "49634073611871172072718215979886834075896782645243150338",
        "subsequenceNumber": 0,
        "partitionKey": "E9512A4081B278E3F4030ABCEC1B52CA",
        "shardId": "shardId-000000000000",
        "approximateArrivalTimestamp": 1667839931848
      }
    },
  {
      "recordId": "49634073611871172072718215979886834075896782645243150338000003",
      "approximateArrivalTimestamp": 1667937631848,
      "data": "ewoJIkFwcHJveGltYXRlQ3JlYXRpb25EYXRlVGltZSI6IDE2Njc1NzY5OTksCgkib2JqZWN0IjogInBvc3QiLAogICAgICAgICJhY3Rpb24iOiJtb2RpZnkiLAoJInBvc3RfaWQiOiAiY2FkODgwMWYtMzdnYi0zMjYwLXoxOTItMWFiMGEyNGYzYTE3IiwKCSJuZXdfdGl0bGUiOiAiQmVzdCBQb3N0IEV2ZXIhIiwKCSJuZXdfdGV4dCI6ICJCZXN0IHRleHQgZm9yIGJlc3QgUE9TVCEiLAoJInRpbWVzdGFtcCI6IDE2Njc1Nzk5OTksCgkidXNlciI6ICJCaXNoIiwKCSJTaXplQnl0ZXMiOiAxMTEKfQ==",
      "tableName": "posts",
      "kinesisRecordMetadata": {
        "sequenceNumber": "49634073611871172072718215979886834075896782645243150338",
        "subsequenceNumber": 0,
        "partitionKey": "E9512A4081B278E3F4030ABCEC1B52CA",
        "shardId": "shardId-000000000000",
        "approximateArrivalTimestamp": 1667837631848
      }
    },
  {
      "recordId": "49634073611871172072718215979886834075896782645243150338000004",
      "approximateArrivalTimestamp": 1667837631848,
      "data": "ewoJIkFwcHJveGltYXRlQ3JlYXRpb25EYXRlVGltZSI6IDE2Njc1NzY5OTksCgkib2JqZWN0IjogInBvc3QiLAogICAgICAgICJhY3Rpb24iOiJkZWxldGUiLAoJInBvc3RfaWQiOiAiY2FkODgwMWYtMzdnYi0zMjYwLXoxOTItMWFiMGEyNGYzYTE3IiwKCSJ0aW1lc3RhbXAiOiAxNjY3NTc2OTk5LAoJInVzZXIiOiAiQmlzaCIsCgkiU2l6ZUJ5dGVzIjogMzUKfQ==",
      "tableName": "posts",
      "kinesisRecordMetadata": {
        "sequenceNumber": "49634073611871172072718215979886834075896782645243150338",
        "subsequenceNumber": 0,
        "partitionKey": "E9512A4081B278E3F4030ABCEC1B52CA",
        "shardId": "shardId-000000000000",
        "approximateArrivalTimestamp": 1668837631848
      }
    }
]}

Вставляем все это в поле Event JSON, а заодно и имя не забываем дать нашему событию Event name

на данном этапе достаточно будет просто нажать Save

Вернемся обратно к коду, нажав на Code

Нажмем на кнопочку Test

И мы увидим ответ от функции. И шо наша хохма с тестовыми данными таки да — работает 😉

На этом этапе функция завершена, оставим ее в покое.

2. Создаем S3 Bucket

Для тех, кто в курсе этого процесса — можно пропустить это пункт, для остальных идем далее

Перемещаемся в AWS S3 Bucket

Жмем:

Но тут тоже все просто — зададим имя для корзины

И жмем

в самом конце страницы