diff --git a/queries.js b/queries.js new file mode 100644 index 0000000..73d5aae --- /dev/null +++ b/queries.js @@ -0,0 +1,8 @@ +import dotenv from "dotenv"; +dotenv.config(); + +export const productQuery = `SELECT DISTINCT + line_item_resource_id AS resourceId, + line_item_product_code AS productCode, + line_item_usage_account_id AS accountId +FROM ${process.env.ATHENA_CU_TABLE};`; \ No newline at end of file diff --git a/server.js b/server.js index 0cbb97c..3172a54 100644 --- a/server.js +++ b/server.js @@ -1,12 +1,10 @@ import Fastify from "fastify"; import sequelizePlugin from "./plugins/sequelize.js"; import dotenv from "dotenv"; +import { executeQueryAsync, retrieveResultsAsync } from "./services/athena.js"; dotenv.config(); +import * as queries from "./queries.js"; -import { S3Client, ListBucketsCommand } from "@aws-sdk/client-s3"; -const s3Client = new S3Client({ region: process.env.AWS_REGION, profile: 'default' }); -const listCommand = new ListBucketsCommand({}); -const response = await s3Client.send(listCommand); const server = Fastify({ logger: true }); server.register(sequelizePlugin); @@ -17,49 +15,13 @@ server.get("/", async (request, reply) => { return { hello: "world" }; }); -const productQuery = `SELECT DISTINCT - line_item_resource_id AS resourceId, - line_item_product_code AS productCode, - line_item_usage_account_id AS accountId -FROM ${process.env.ATHENA_CU_TABLE};`; - -import { AthenaClient, StartQueryExecutionCommand, GetQueryExecutionCommand, GetQueryResultsCommand } from "@aws-sdk/client-athena"; -const athenaClient = new AthenaClient({ region: process.env.AWS_REGION, profile: 'default' }); - -const startQueryCommand = new StartQueryExecutionCommand({ - QueryString: productQuery, - QueryExecutionContext: { Database: process.env.ATHENA_CU_DATABASE }, - ResultConfiguration: { OutputLocation: process.env.ATHENA_OUTPUT_S3_BUCKET }, +server.get("/products", async (request, reply) => { + const query = queries.productQuery; + const queryExecutionId = await executeQueryAsync(query); + const results = await retrieveResultsAsync(queryExecutionId); + return { results }; }); -const startQueryResponse = await athenaClient.send(startQueryCommand); -const queryExecutionId = startQueryResponse.QueryExecutionId; - -let queryExecutionStatus; -do { - const getQueryExecutionCommand = new GetQueryExecutionCommand({ - QueryExecutionId: queryExecutionId, - }); - - const queryExecutionResponse = await athenaClient.send(getQueryExecutionCommand); - queryExecutionStatus = queryExecutionResponse.QueryExecution.Status.State; - - if (queryExecutionStatus === "FAILED") { - console.error(`Query Failed: ${JSON.stringify(queryExecutionResponse)}`); - break; - } - - await new Promise((resolve) => setTimeout(resolve, 5000)); -} while (queryExecutionStatus !== "SUCCEEDED"); - -const getQueryResultsCommand = new GetQueryResultsCommand({ - QueryExecutionId: queryExecutionId, -}); - -console.log(getQueryResultsCommand); -const result = await athenaClient.send(getQueryResultsCommand); -console.log(result.ResultSet.Rows); - try { await server.listen({ port: 3000 }) } catch (err) { diff --git a/services/athena.js b/services/athena.js new file mode 100644 index 0000000..9cd88d3 --- /dev/null +++ b/services/athena.js @@ -0,0 +1,70 @@ + +import { AthenaClient, StartQueryExecutionCommand, GetQueryExecutionCommand, GetQueryResultsCommand } from "@aws-sdk/client-athena"; +const athenaClient = new AthenaClient({ region: process.env.AWS_REGION, profile: 'default' }); + +const executionTimeout = 1000; + +const productQuery = `SELECT DISTINCT + line_item_resource_id AS resourceId, + line_item_product_code AS productCode, + line_item_usage_account_id AS accountId +FROM ${process.env.ATHENA_CU_TABLE};`; + +export const executeQueryAsync = async (sqlQuery) => { + const startQueryCommand = new StartQueryExecutionCommand({ + QueryString: sqlQuery, + QueryExecutionContext: { Database: process.env.ATHENA_CU_DATABASE }, + ResultConfiguration: { OutputLocation: process.env.ATHENA_OUTPUT_S3_BUCKET }, + }); + + const startQueryResponse = await athenaClient.send(startQueryCommand); + const queryExecutionId = startQueryResponse.QueryExecutionId; + + let queryExecutionStatus; + do { + const getQueryExecutionCommand = new GetQueryExecutionCommand({ + QueryExecutionId: queryExecutionId, + }); + const queryExecutionResponse = await athenaClient.send(getQueryExecutionCommand); + queryExecutionStatus = queryExecutionResponse.QueryExecution.Status.State; + + if (queryExecutionStatus === "FAILED") { + console.error(`Query Failed: ${JSON.stringify(queryExecutionResponse)}`); + break; + } + await new Promise((resolve) => setTimeout(resolve, executionTimeout)); + } while (queryExecutionStatus !== "SUCCEEDED"); + + return queryExecutionId; +} + +export const retrieveResultsAsync = async (queryExecutionId) => { + const getQueryResultsCommand = new GetQueryResultsCommand({ + QueryExecutionId: queryExecutionId, + }); + + const result = await athenaClient.send(getQueryResultsCommand); + if (!result || !result.ResultSet || !result.ResultSet.Rows) { + return { + statusCode: 404, + body: JSON.stringify([]), + }; + } + let rows = result.ResultSet.Rows; + if (0 == rows.length || 1 == rows.length) { + return { + statusCode: 400, + body: JSON.stringify([]), + }; + } + const columnNames = rows[0].Data.map(item => item.VarCharValue); + const items = []; + rows.slice(1).forEach(cells => { + const item = {}; + for (let i = 0; i < cells.Data.length; i++) { + item[columnNames[i]] = cells.Data[i].VarCharValue; + } + items.push(item); + }); + return items; +};