AWS IoT Retrieving SQS Messages from a Queue using an IoT Rule and a Lambda function

AWS IoT Rules have predefined rules for sending a message to an SQS Queue, but for some reason not to retrieve a message from a queue using an IoT rule (or does it? if you know how, leave me a comment). You can easily retrieve a message using a Lambda function using the AWS SDK apis though, and you can call a Lambda function using an IoT Rule, so let’s set that up.

To create an IoT Rule to call the Lambda function, which we’ll trigger with incoming MQTT messages on a topic called topic/checkForMessage:

Next, select an action from the long list of available actions, chose ‘Invoke a Lambda function’:

Select the Lambda function we want to call, in this case it’s one created earlier (it has to exist to show in the list, if not press ‘Create a new Resource’ to create one):

On the next summary screen press ‘Create Rule’ and you’re done:

To allow the IoT Rule to call the function, we need to grant the lambda:invokeFunction rule.

Per the docs, we can use the AWS CLI to add the permission like this:

aws lambda add-permission 
  --function-name "function_name" 
  --region "region" 
  --principal iot.amazonaws.com 
  --source-arn arn:aws:iot:us-east-2:account_id:rule/rule_name 
  --source-account "account_id" 
  --statement-id "unique_id" 
  --action "lambda:InvokeFunction"

To apply this to our function and rule, replace:

“function_name” : “LightsOnReceiveMessageFromQueue”

“region”: “us-east-1”

source-arn arn: aarn:aws:iot:full-arn-for-the-rule – see below

account_id: your-aws-account-id

rule_name: RetrieveSQSMessageFromQueue

“account_id”: “your-account-id”

“unique_id”: unique-id-for-this-permission

I’m not sure the AWS Console for IoT shows the ARN for IoT Rules anywhere in it’s pages, but you can easily list it with the AWS CLI, using:

$ aws iot list-topic-rules
{
    "rules": [
        {
            "ruleArn": "arn:aws:iot:us-east-1:your-account-id:rule/RetrieveSQSMessageFromQueue",
            "ruleName": "RetrieveSQSMessageFromQueue",
            "topicPattern": "topic/checkForMessage",
            "createdAt": 1511115896.0,
            "ruleDisabled": false
        }
    ]
}

Ok, plugging in my values into the aws cli statement I have a permission added.

This is it for the IoT Rule. To summarize, this allows us to:

  • respond to incoming messages from an AWS IoT device publishing a message to an MQTT topic called topic/checkForMessages
  • when a message arrives from the device on the topic, it triggers the IoT Rule we just created
  • the rule invokes an AWS Lambda to interact with an AWS SQS Queue to pull a message from a queue.

I’ll share more details on the implementation of the Lambda to interact with the SQS queue and the implementation of the node.js app on a Raspberry Pi in upcoming posts. You’re probably wondering what this is that I’m building? Check back for my followup posts to find out!

This is the second post in a series on AWS and IoT, the first is here:

Installing AWS CLI on MacOS 10.13

The AWS instructions to install the AWS CLI using Python and pip work on MacOS 10.13 (High Sierra) up to the point of adding the Python install location to your path – I found that on 10.13, following the steps didn’t result in the aws command being found.

At the step to addto your path:

  • running ‘which python’ showed:
$ which python
/usr/bin/python

but, ls -la did not show that this was a symbolic link in my install per the docs, so this location is also not the same location where the pip installed aws command is.

This post has an answer that suggests the issue is because AWS CLI instructions tell you to do:

pip3 install awscli --upgrade --user

but the –user option specifies a user location. To find where the pip install is installing to, do:

python3 -m site --user-base

This told me:

/Users/kev/Library/Python/3.6

So taking a looking in the bin dir in this location, this is where the aws cli ended up. Adding this path to my PATH in my ~/.bash_profile and now aws command works as expected.

Publishing a message from a webapp to an AWS SQS Queue via AWS Lamba

I’m building a simple project that needs to receive requests from a simple webpage and process them over time sequentially (more on this later!). Using an AWS SQS queue seems like a good fit for what I’m looking for. Without creating something heavyweight like exposing a REST endpoint running in an EC2 instance, this also seemed like a good opportunity to look into integrating calls from a webpage to a AWS Lambda function. This gives the benefit of not needing to pay for an EC2 instance when it’s up but idle.

To get started I created an AWS SQS queue using the AWS Console (the name of the queue might give away what I’m working on 🙂

I then created a Lambda function to post a message to the queue, using the script from this gist here:

Testing the Lambda from the AWS Console I get this error:

2017-11-12T17:07:19.969Z error: Fail Send Message: AccessDenied: Access to the resource https://sqs.us-east-1.amazonaws.com/ is denied.
2017-11-12T17:07:20.007Z {"errorMessage":"error"}

Per post here, we need to update the default policy we added during creation of the Lambda to include permission to post messages to the queue. The missing permission is to allow sqs:SendMessage and sqs:GetQueueUrl on your SQS Queue resource (insert your ARN for your queue in the Resource name):

{
      "Action": [
        "sqs:SendMessage",
        "sqs:GetQueueUrl"
      ],
      "Effect": "Allow",
      "Resource": "arn:aws:sqs:us-east-1:SOME_ID_HERE:test-messages"
    }

Using the Saved Test Event, now we’re looking good!

2017-11-12T17:32:03.906Z	messageId: f04a...
END RequestId: 658a...
REPORT RequestId: 658a... Duration: 574.09 ms
Billed Duration: 600 ms
Memory Size: 128 MB
Max Memory Used: 42 MB

Let’s take a look in our queue from the SQS Management Console and see if our payload is there:

Now we’ve got our Lambda to post a message into our queue, how can we can call it from a webpage using some Javascript? Looking in the AWS docs there’s an example here. This page also walks through creating configuring  the AWS SDK api to use a Cognito identity pool for unauthorized access to call the Lambda. Step by step on how to create Cognito pools via the AWS Console are in the docs here. It seems there’s a gap in the docs though as it doesn’t explicitly state how to to create a Cognito pool for unauthorized access.

Just out of curiousity, if you attempt to call your Lambda function without any authentication, you get an error that looks like this:

Error: Missing credentials in config
 at credError (bundle.js:10392)
 at Config.getCredentials (bundle.js:10433)
 at Request.VALIDATE_CREDENTIALS (bundle.js:11562)

Ok, so back to creating the Cognito Pool. From the AWS Console, select Cognito. The option you need to select is ‘Manage Federated Identities’ which is where the option is for creating a pool for authenticated access:

Check the box: ‘Enable access to unauthenticated identities’:

Now we’re back to the AWS SDK for JavaScript and can plug in in our Cognito pool id into this config:

AWS.config.update({region: 'REGION'}); AWS.config.credentials = new AWS.CognitoIdentityCredentials({IdentityPoolId: 'IdentityPool'});

My JavaScript to call the Lambda function so far looks like this:

var AWS = require('aws-sdk');

//init AWS credentials with unauthenticated Cognito Identity pool
AWS.config.update({region: 'us-east-1'});
AWS.config.credentials = new AWS.CognitoIdentityCredentials({IdentityPoolId: 'pool-id-here'});

var lambda = new AWS.Lambda();
// create payload for invoking Lambda function
var lambdaCallParams = {
    FunctionName : 'LightsOnMessageToQueue',
    InvocationType : 'RequestResponse',
    LogType : 'None'
};

function callLambda(){
    var result;
    lambda.invoke(lambdaCallParams, function(error, data) {
        if (error) {
            console.log(error);
        } else {
            result = JSON.parse(data.Payload);
            console.log(result);
        }
    });
}

module.exports = {
    callLambda: callLambda
}

Calling the JavaScript now, I get a different error:

assumed-role/Cognito_PostToQueueUnauthRole/CognitoIdentityCredentials
is not authorized to perform: lambda:InvokeFunction 
on resource: arn:aws:lambda:us-east-1:xxx:function:LightsOnMessageToQueue"}

The error is telling us that the permission ‘lambda:InvokeFunction’ is missing for the role Cognito_PostToQueueUnauthRole, so let’s go back and edit and add it. The role was created when we stepped through the Cognito setup steps, but to edit it we need to go to the IAM section on the AWS Console. Searching for Lambda related policies to include in this role, it looks like this is what we’re looking for:

We don’t want to grant InvokeFuntion on all (*) resources though, we can use the JSON for this policy to add a new ‘inline policy’ to our role, and then edit it to specify the ARN for our function.

Back to the JavaScript app, we can now see the SDK making several XHR requests to AWS, including a POST to /functions/LightsOnMessageToQueue/invocations returning with a 200.

Checking the AWS Console, we’re now successfully making calls to our Lambda function, and messages are being posted to the queue:

To host my simple webpage, since it’s static content this can easily be served from AWS S3. I created a new Bucket, granted public read access, and enabled the ‘static website hosting’ website option:

To package the app for deployment, AWS have a sample webpack.config.js here. I did an ‘npm run build’ and then uploaded the index.html and bundle.js to my bucket.

So far this is one part of a project, I’ll post another update when I’ve made some progress on the next part.

 

Apache Spark word count – big data analytics with a publicly available data set (part 2)

In a previous post I discussed my findings for developing a simple word count app in Java, and running it against the text of a typical novel, such as Alice in Wonderland.

Here’s a summary of finding so far:

  • 2012 MacBook Pro with a 2.4GHz i7 and SanDisk SSD it completed in 100ms
  • New 2015 MacBook Pro with a 2.2GHz i7 and stock SSD, it completed in 82ms
  • On an Ubuntu 16.04 server running under VMware ESXi on my HP DL380 Gen7 rack server configured with 2 x 2.4 GHz vCPUs,  6GB RAM it completed in 250ms (the slower time probably can be accounted for slower data access from the WD Black 7200rpm HDDs in my server which are likely an i/o bottleneck compared to loading the same data file from SSD)

Now let’s take it up a notch an throw some larger datasets into the mix, using the Yelp dataset. Taking the reviews.json file exported as a csv file running the same app on the same Ubuntu server in a VM on the DL380, performing the same word count against a 2.8GB file. With the same Java word count app this took 434,886ms to complete or just over 7 minutes. Ok, now we’re got something large enough to play with as a benchmark:

kev@esxi-ubuntu-mongodb1:~$ java -jar wordcount.jar ./data/yelp/reviewtext.csv

Incidentally, if you’re interested what the word counts from the Yelp reviews.json data file look like, here’s the first few counts in descending order before we get to some terms you’d expect in Yelp reviews, like ‘good’ and ‘food’:

the : 22169071
and : 18025282
I : 13845506
a : 13416437
to : 12952150
was : 9340074
of : 7811688
 : 7600402
is : 6513076
for : 6207595
in : 5895711
it : 5604281
The : 4702963
that : 4446918
with : 4353165
my : 4188709
but : 3659431
on : 3642918
you : 3570824
have : 3311139
this : 3270666
had : 3015103
they : 3001066
not : 2829568
were : 2814656
are : 2660714
we : 2639049
at : 2624837
so : 2378603
place : 2358191
be : 2276537
good : 2232567
food : 2215236

Now let’s look at re-writing the analysis using Apache Spark. The equivalent code using the Spark API for loading the data set and performing the wordcount turned out to be like this (although if you search for Apache Spark word count, there’s many different ways you could use the available apis to implement a word count):

JavaRDD<String> lines = spark.read().textFile(filepath).javaRDD();

JavaPairRDD<String, Integer> words = lines
    .flatMap(line -> Arrays.asList(line.split("\\s+")).iterator())
    .mapToPair(word -> new Tuple2<>(word, 1))
    .reduceByKey((x, y) -> x + y);

List<Tuple2<String, Integer>> results = words.collect();

Submitting the job to run on a standalone Spark node (an Ubuntu Server 16.04 VM on ESXi) with 1 core (–master local[1] )

./bin/spark-submit --class "kh.textanalysis.spark.SparkWordCount" 
  --master local[1] ../spark-word-count-0.0.1-SNAPSHOT.jar 
  ../data/yelp/reviewtext.csv

… the job completes in 336,326ms or approx 5.6 minutes. At this point this is with minimal understanding of how best to approach or structure an effective Spark job, but we’re already made an improvement and this first test is with a single Spark local node, and no additional worker nodes in the cluster.

Next, with a standalone local node, and 2 cores (–master local[2]) :

170261ms, or 2.8 mins. Now we’re talking.

Let’s trying deploying the master node, and then add some worker nodes.

17/11/07 18:43:33 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://192.168.1.82:7077…

17/11/07 18:43:33 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.1.82:7077

My first guess is that the default 7077 port is not open, so:

sudo ufw allow 7077

Retrying, now the job has submitted, and starts up ok but gives this error.

17/11/07 18:47:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 22 tasks

17/11/07 18:47:44 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

Taking a look at the master node web ui on 8080:

True, we have the master started but no workers yet, so let’s start up 1 slave node first (another Ubuntu Server 16.04 VM on ESXi):

$ ./start-slave.sh -m 1G spark://192.168.1.82:7077

starting org.apache.spark.deploy.worker.Worker, logging to /home/kev/spark-2.2.0-bin-hadoop2.7/logs/spark-kev-org.apache.spark.deploy.worker.Worker-1-ubuntuspk1.out

Now we’ve got 1 worker up and ready:

Submitting the same job again to the master node, and now there’s a FileNotFound on not finding the file we’re attempting to process:

Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 192.168.1.89, executor 0): java.io.FileNotFoundException: File file:/home/kev/data/yelp/reviewtext.csv does not exist

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running ‘REFRESH TABLE tableName’ command in SQL or by recreating the Dataset/DataFrame involved.

I was curious how the datafile would be shared with the slave nodes and this answers my question, I guess it does not (or the way I’ve implemented my job so far assumes the data file is local). Clearly have some more work to do in this area to work out what the approach is to share the datafile between the nodes. In the meantime, I’m just going to copy the same file across each of the worker nodes to get something up and running. I’m guessing a better way would be to mount a shared drive between each of the workers, but I’ll come back to this later.

Datafile copied to my worker VMs, and restarting, now we’ve got the job running on 1 slave, 1 core, 1GB:

Here’s an interesting view of the processing steps in my job:

Taking a quick look at the HP server ILO, the CPUs are barely breaking a sweat and fan speeds are still low:

The Spark dashboard shows completion times, so now we’re at 5.3 mins:

Let’s add an extra vCPU to the first worker node (I need to reconfigure my VM in ESXi and then restart). The console’s showing an additional stopped worker from a second slave VM that I’ll start up for the last test. First, 2 vCPUs, starting up with – “-c 2” for 2 cores:

./start-slave.sh -c 2 -m 1G spark://192.168.1.82:7077

166773 ms, 2.7 minutes. Looking good!

Now with the additional second slave node, also started with 2 vCPUs, so now we have 2 worker slave nodes, 4 vCPUs total:

During the middle of the run, checking fan speeds – warming up and fans are running a little faster, but nothing crazy so far:

102977ms, or 1.7 minutes!

Curious how far we can go, lets try 2 worker slave nodes, 4vCPUs each, and bump up the available RAM to 3GB. Reconfigure my VMs, and off we go:

./start-slave.sh -c 4 -m 3G spark://192.168.1.82:7077

81998ms, or 1.4 minutes!

Still pretty good although the performance gains for doubling the cores per worker and adding more RAM seems to be leveling off and now we’re not seeing the same magnitude of improvements, so possible at this point it might be more interesting to add additional slave worker nodes. I’ve been creating my ESXi VMs by hand up until this point, so maybe this is the time to looking into some automation for spinning up multiple copies of the same VM.

Let’s summarize the results so far, running the standalone Java 8 app on the same HP server as Spark, and then comparing the various Spark configurations so far:

I’ve got ample resources left on my HP DL380 G7 rack server to run a few more VMs equally sized to what I have running, so maybe if I can work out an easy way to template the VMs I have so far, I’ll spin up some additional VMs as worker nodes and see what’s the fastest processing time I can get with the hardware I have. More updates to come later.