### Redshift Streaming Ingestion
Run cloud formation template found here- https://tinyurl.com/zxv5b8jk

Cloud formation creates Redshift cluster, Lambda Function, KDS and Eventbridge rule which invokes the lambda function

#### Check KDS is working

1. Login to Aws Console --> Services --> search and select Kinesis Data Stream
1. Click on the Data stream with name "cust-payment-txn-stream" to view the details page.
1. Next click on the Monitoring tab and scroll down to check cloud watch metrics for the activity.
1. Click on Data viewer tab, select of one of the shard and click on Get records button to view the Data

#### Create External Schema

Create an external schema to map the data from Kinesis to a Redshift object.

In [0]:
CREATE EXTERNAL SCHEMA custpaytxn
FROM KINESIS
IAM_ROLE  default;

### Check avaialble streams
You can find out list of topics from svv_external_tables view

In [0]:
--verify available streams 
select * from SVV_EXTERNAL_TABLES;

#### Creating Streaming Ingestion object

Create a materialized view to consume the stream data. The following examples show both methods of defining materialized views to ingest the JSON source data.

In below example, we are parsing JSON format data.  Alternatively you can store stream records in semi-structured SUPER format. 

In below SQL, we parsing our fields, checking data is utf8 format and also making sure incoming data is valid json and can be parsed.

In [0]:
CREATE MATERIALIZED VIEW cust_payment_tx_stream
AUTO REFRESH YES 
AS
SELECT approximate_arrival_timestamp ,
partition_key,
shard_id,
sequence_number,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'TRANSACTION_ID')::bigint as TRANSACTION_ID,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'TX_DATETIME')::character(50) as TX_DATETIME,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'CUSTOMER_ID')::int as CUSTOMER_ID,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'TERMINAL_ID')::int as TERMINAL_ID,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'TX_AMOUNT')::decimal(18,2) as TX_AMOUNT,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'TX_TIME_SECONDS')::int as TX_TIME_SECONDS,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'TX_TIME_DAYS')::int as TX_TIME_DAYS
FROM custpaytxn."cust-payment-txn-stream"
Where is_utf8(kinesis_data) AND can_json_parse(kinesis_data);

### AUTO Refresh takes some time to kick in, give it upto a minute for the first run. 
You can also manually refresh the MV.

In [0]:
REFRESH MATERIALIZED view cust_payment_tx_stream;

In [0]:
--sample the records
SELECT * FROM cust_payment_tx_stream LIMIT 10;

In [0]:
--count total number of records
SELECT COUNT(*) AS stream_rec_count FROM cust_payment_tx_stream;

In [0]:
--check through put

Select 
cast(approximate_arrival_timestamp as date) as Arrival_Date,
Extract(hour from approximate_arrival_timestamp) as Arrival_Hour,
Extract(Minute from approximate_arrival_timestamp) as Arrival_Minute,
Extract(Second from approximate_arrival_timestamp) as Arrival_Second, Count(*)
From
cust_payment_tx_stream  a
group by 1, 2,3 ,4
Order by 1 desc, 2 desc, 3 desc, 4 desc;

### Log Tables

In [0]:
SELECT * FROM stv_mv_info WHERE name = 'cust_payment_tx_stream' LIMIT 10;