Added products

This commit is contained in:
Benoy Bose 2025-01-09 21:57:57 +05:30
parent c91003faa6
commit cd9f800a99
3 changed files with 85 additions and 45 deletions

8
queries.js Normal file
View File

@ -0,0 +1,8 @@
import dotenv from "dotenv";
dotenv.config();
export const productQuery = `SELECT DISTINCT
line_item_resource_id AS resourceId,
line_item_product_code AS productCode,
line_item_usage_account_id AS accountId
FROM ${process.env.ATHENA_CU_TABLE};`;

View File

@ -1,12 +1,10 @@
import Fastify from "fastify";
import sequelizePlugin from "./plugins/sequelize.js";
import dotenv from "dotenv";
import { executeQueryAsync, retrieveResultsAsync } from "./services/athena.js";
dotenv.config();
import * as queries from "./queries.js";
import { S3Client, ListBucketsCommand } from "@aws-sdk/client-s3";
const s3Client = new S3Client({ region: process.env.AWS_REGION, profile: 'default' });
const listCommand = new ListBucketsCommand({});
const response = await s3Client.send(listCommand);
const server = Fastify({ logger: true });
server.register(sequelizePlugin);
@ -17,49 +15,13 @@ server.get("/", async (request, reply) => {
return { hello: "world" };
});
const productQuery = `SELECT DISTINCT
line_item_resource_id AS resourceId,
line_item_product_code AS productCode,
line_item_usage_account_id AS accountId
FROM ${process.env.ATHENA_CU_TABLE};`;
import { AthenaClient, StartQueryExecutionCommand, GetQueryExecutionCommand, GetQueryResultsCommand } from "@aws-sdk/client-athena";
const athenaClient = new AthenaClient({ region: process.env.AWS_REGION, profile: 'default' });
const startQueryCommand = new StartQueryExecutionCommand({
QueryString: productQuery,
QueryExecutionContext: { Database: process.env.ATHENA_CU_DATABASE },
ResultConfiguration: { OutputLocation: process.env.ATHENA_OUTPUT_S3_BUCKET },
server.get("/products", async (request, reply) => {
const query = queries.productQuery;
const queryExecutionId = await executeQueryAsync(query);
const results = await retrieveResultsAsync(queryExecutionId);
return { results };
});
const startQueryResponse = await athenaClient.send(startQueryCommand);
const queryExecutionId = startQueryResponse.QueryExecutionId;
let queryExecutionStatus;
do {
const getQueryExecutionCommand = new GetQueryExecutionCommand({
QueryExecutionId: queryExecutionId,
});
const queryExecutionResponse = await athenaClient.send(getQueryExecutionCommand);
queryExecutionStatus = queryExecutionResponse.QueryExecution.Status.State;
if (queryExecutionStatus === "FAILED") {
console.error(`Query Failed: ${JSON.stringify(queryExecutionResponse)}`);
break;
}
await new Promise((resolve) => setTimeout(resolve, 5000));
} while (queryExecutionStatus !== "SUCCEEDED");
const getQueryResultsCommand = new GetQueryResultsCommand({
QueryExecutionId: queryExecutionId,
});
console.log(getQueryResultsCommand);
const result = await athenaClient.send(getQueryResultsCommand);
console.log(result.ResultSet.Rows);
try {
await server.listen({ port: 3000 })
} catch (err) {

70
services/athena.js Normal file
View File

@ -0,0 +1,70 @@
import { AthenaClient, StartQueryExecutionCommand, GetQueryExecutionCommand, GetQueryResultsCommand } from "@aws-sdk/client-athena";
const athenaClient = new AthenaClient({ region: process.env.AWS_REGION, profile: 'default' });
const executionTimeout = 1000;
const productQuery = `SELECT DISTINCT
line_item_resource_id AS resourceId,
line_item_product_code AS productCode,
line_item_usage_account_id AS accountId
FROM ${process.env.ATHENA_CU_TABLE};`;
export const executeQueryAsync = async (sqlQuery) => {
const startQueryCommand = new StartQueryExecutionCommand({
QueryString: sqlQuery,
QueryExecutionContext: { Database: process.env.ATHENA_CU_DATABASE },
ResultConfiguration: { OutputLocation: process.env.ATHENA_OUTPUT_S3_BUCKET },
});
const startQueryResponse = await athenaClient.send(startQueryCommand);
const queryExecutionId = startQueryResponse.QueryExecutionId;
let queryExecutionStatus;
do {
const getQueryExecutionCommand = new GetQueryExecutionCommand({
QueryExecutionId: queryExecutionId,
});
const queryExecutionResponse = await athenaClient.send(getQueryExecutionCommand);
queryExecutionStatus = queryExecutionResponse.QueryExecution.Status.State;
if (queryExecutionStatus === "FAILED") {
console.error(`Query Failed: ${JSON.stringify(queryExecutionResponse)}`);
break;
}
await new Promise((resolve) => setTimeout(resolve, executionTimeout));
} while (queryExecutionStatus !== "SUCCEEDED");
return queryExecutionId;
}
export const retrieveResultsAsync = async (queryExecutionId) => {
const getQueryResultsCommand = new GetQueryResultsCommand({
QueryExecutionId: queryExecutionId,
});
const result = await athenaClient.send(getQueryResultsCommand);
if (!result || !result.ResultSet || !result.ResultSet.Rows) {
return {
statusCode: 404,
body: JSON.stringify([]),
};
}
let rows = result.ResultSet.Rows;
if (0 == rows.length || 1 == rows.length) {
return {
statusCode: 400,
body: JSON.stringify([]),
};
}
const columnNames = rows[0].Data.map(item => item.VarCharValue);
const items = [];
rows.slice(1).forEach(cells => {
const item = {};
for (let i = 0; i < cells.Data.length; i++) {
item[columnNames[i]] = cells.Data[i].VarCharValue;
}
items.push(item);
});
return items;
};