The meaningfulness of Events via standardization ( Part 3 )

The meaningfulness of Events via standardization ( Part 3 )

In previous parts, we discovered how to settle an event standard and how to reuse the asyncapi specification to validate an event.

Defining a spec first approach simplifies the contract and promise definition in a collaborative manner helping the tech and business-oriented staff make an agreement and think around a contract.

The specification must be the core of communication

Producer

The producer is the generator of events and its most important responsibility is to respect the promise that is defined.

here is how the producer generates an event

    const bodyJson = JSON.parse(record.body);
    const data = { entityId: id,price: bodyJson.price, quantity: bodyJson.quantity };
    const integration = CreateIntegrationEvent(
        data,
        EventType.Created,
        "1.0.0"
    );

    Promise.resolve(
        validate( integration ))
        .then(() => console.log("Event respects the Order Notification Specifications"))
        .catch( (error) => logErrors(error));

    await PublishEvent(integration)

In the above snippet of code, the producer receives a message, generates an Integration event validates the generated event, and publishes that event to a broker.

The generated event can be invalid and without validating that event the problem will be introduced on the consumer side leading to a single problem being dispatched in multiple services and hiding the problem on the producer side.

Consumer

The consumer subscribes to a broker and waits for events, ideally, the consumers wait for a validated event and hope to receive a valid event without any further inconvenience.

A better approach is to let the consumer as well validate the received event easily to identify any invalidity at the first step.

    const recordBody = JSON.parse(event.Records[0].body);
    console.log(`happy to receive th integration event ${recordBody.type} for Id: ${recordBody.id} from ${recordBody.source}`);
    console.log(`validating version ${recordBody.dataVersion} as:  \n ${JSON.stringify(recordBody.data, null, 4)} `);

    // The validation will be executed against the dataSchema already represented by producer
    // choosing the rigth validator based on event dataVersion presented
    // This is a demonstration example 
    const importedModule = await import(`../../shared/streams/CAPABILITY/${recordBody.dataVersion}/validator`);

    Promise.resolve(
        importedModule.validate( recordBody ))
        .then(() => console.log("Event respects the enterrise specification"))
        .catch(() => console.log("an error while validation event"));

In the above example, the consumer validates the event based on the data version presented in the incoming event dynamically importing the validator.

Validator

We had a brief example of validating against a specification yaml file but this approach is not a best practice as it relies on disk and can impact runtime performance per I/O conditions.

Let's refine the validator

import Ajv from "ajv";
import addFormats from "ajv-formats";

const ajvOptions = { allErrors: true };

class SchemaValidator {
    validate = async (event: any, schema: any) => {
        return this.AjvValidate(event, schema, event.dataSchema?.split('#')[0], event.dataSchema);
    }

    private AjvValidate(...) {
        // Emmited
    }
}

export { SchemaValidator }

The above code is a generic schema validator, the validate method waits for a schema object., so we need to generate a typescript const from the asyncapi.

import { SchemaParser } from "./asyncapi/schema-parser";
import * as fs from "fs";
const path = "./CAPABILITY";
const version = "1.0.0";

const parser = new SchemaParser('./CAPABILITY');
Promise.resolve(parser.initSchema("1.0.0")).then(
    () => {
        fs.writeFileSync(`${path}/${version}/schema.ts`, 'export const schema = ' + Buffer.from(JSON.stringify(parser.schema)))
    }
).catch(
    (err) => console.error(err)
);

The above snippet of code simply generates the schema object in typescript from asyncapi specification.

now let's use that generated schema object to validate the events in real time based on the produced event data.

import { SchemaValidator } from '../../asyncapi/schema-validator';

const validator = new SchemaValidator();

const validate = async (event: any) => {
    const schema = await import(`../CAPABILITY/${event.dataVersion}/shcema`);
    return validatorVersion1.validate(event, schema)
}

export { validate };

This validator can be shared with the producer and consumer to let both sides validate the events.

Design

In this series, we use Terraform for infra as code, first

The source code can be found on GitHub

The producer lambda handler code is

import { Handler, SQSEvent } from "aws-lambda";
import { nanoid } from "nanoid";
import { validate } from "../../docs/streams/CAPABILITY/validator";
import { CreateIntegrationEvent, CreateNotificationEvent, EventType } from "@shared/models/models";
import { PublishEvent } from "@shared/adapters/events-publisher";

const lambdaHandler: Handler = async (event: SQSEvent) : Promise<void> => {

    console.log(event);
    const record = event.Records[0];
    const id = nanoid();

    const notification = CreateNotificationEvent({ entityId: id}, EventType.Created, "1.0.0" );

    Promise.resolve(
        validate( notification ))
        .then(() => console.log("Event respects the Order Notification Specifications"))
        .catch( (error) => logErrors(error));

    await PublishEvent(notification)

    const bodyJson = JSON.parse(record.body);
    const data = { entityId: id,price: bodyJson.price, quantity: bodyJson.quantity };
    const integration = CreateIntegrationEvent(
        data,
        EventType.Created,
        "1.0.0"
    );

    Promise.resolve(
        validate( integration ))
        .then(() => console.log("Event respects the Order Notification Specifications"))
        .catch( (error) => logErrors(error));

    await PublishEvent(integration)
};

function logErrors(error: any) {
    const msg = { Errors: JSON.parse(error.message) };
    console.log(msg);
    throw new Erro(error);
}


export const handler = lambdaHandler;

And the Consumer handler looks like

import { Handler, SQSEvent } from "aws-lambda";
import { validate } from "../../docs/streams/CAPABILITY/validator";

const lambdaHandler: Handler = async (event: SQSEvent) : Promise<void> => {
    const recordBody = JSON.parse(event.Records[0].body);

    console.log(`happy to receive th integration event ${recordBody.type} for Id: ${recordBody.id} from ${recordBody.source}`);
    console.log(`validating version ${recordBody.dataVersion} as:  \n ${JSON.stringify(recordBody.data, null, 4)} `);

    Promise.resolve(
        validate( recordBody ))
        .then(() => console.log("Event respects the enterrise specification"))
        .catch(() => console.log("an error while validation event"));
};

export const handler = lambdaHandler;

Conclusion

The Api First approach introduced the contract first approach and we used this approach for Event-driven design in this series.

Relying on a contract helps to settle on top of the best practices at the enterprise level.

In this part, we could use the definition from parts 1 & 2 to prepare a use case-based project and use the contract and promises on the producer and consumer side

The promise in traditional design is the producer's responsibility but it's a best practice to double-check and revalidate the events on the consumer side.

We tried to simplify the process of validation by sharing a base validation process between the consumer and producer, this approach can be as well obtained by a shared library.

In the next part, we focus on FIltering and consumption optimization.