How do you rate this blog

Wednesday, June 26, 2019

Azure IoT

This is a post for people who are exploring Azure IoT for the first time. The idea is to stream the data from code push it into Azure and see it via visualizations in Power BI.
To do this we will need the following Azure resources.
1. IoT Hub
2. Event Hub
3. Stream Analytics
4. Service Bus
5. Logic Apps
6. SQL Server Database

The data flow diagram will be as shown in the image below




Steps -

1. Configure IoT hub
2. Write a code in python which will simulate sending data to IoT Hub
3. Configure Event Hub and add it in the message routing
4. Configure Stream Analytics with Event hub as an input
5. Configure SQL Server
6. Configure Service Bus
7. Configure SQL Server and Service Bus as two outputs for Stream Analytics
8. Create Logic apps based out of SQL Server database tables and create a work order table


Step 1 - Create and Configure IoT Hub -
a) Choose IoT Service in Azure and click on create. Once you do this you will be taken to a screenshot below -



b) After the IoT Hub service is provisioned, you should be able to see something similar to the screenshot below -
  




c) Now we need to create a sample IoT Device as shown in the snapshot below. We will be streaming our data using the connection string of this IoT Device.















     
            

d) Configure the message routing once you create the Event Hub.


       





      
              

Step 2 - Write a code in python which will simulate sending data to IoT Hub:

a) Pick up the connection string into a variable as stated in Step 1 c.
b) init the iot hub connection.
c) Format the string to simulate sensor data.
d) Send the data to iot hub
e) check for the status of reply.
f) Check for the status in iot hub.



import os
import datetime
import time
import random

import iothub_client
from iothub_client import IoTHubClient, IoTHubClientError, IoTHubTransportProvider, IoTHubClientResult
from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError, DeviceMethodReturnValue




CONNECTION_STRING = '<IoT Hub Connection String>'

sensorlist = [
                
                'Engine RPM'
    ]


AssetList = [
                'Device 1',
                'Device 2'
    ]

EngineVoltage = 233

sensorval_dict={
                
                "Engine RPM":5000
}


PROTOCOL = IoTHubTransportProvider.HTTP
MESSAGE_TIMEOUT = 10000

def iothub_client_init():
    # Create an IoT Hub client
    client = IoTHubClient(CONNECTION_STRING, PROTOCOL)
    return client

def send_confirmation_callback(message, result, user_context):
    print ( "IoT Hub responded to message with status: %s" % (result) )
    
    
def sensorDataParser(SensorData):
    attributes = SensorData.split(',')
    DeviceID = attributes[0]
    Parameter = attributes[1]
    Datetimestamp = attributes[2]
    Datetime = datetime.datetime.fromtimestamp(int(Datetimestamp)/1000)
    Val = attributes[3]
    #TypeofParam= attributes[4] Commented as of today
    MSG_TXT="{\"DeviceID\" : \""+ DeviceID +"\", \"Parameter\" : \""+ Parameter +"\", \"Datetimestamp\" : \""+ str(Datetime) +"\", \"Val\" : \""+ Val +"\"}"
    return MSG_TXT     


# Handle direct method calls from IoT Hub
def device_method_callback(method_name, payload, user_context):
    global INTERVAL
    print ( "\nMethod callback called with:\nmethodName = %s\npayload = %s" % (method_name, payload) )
    device_method_return_value = DeviceMethodReturnValue()
    if method_name == "SetTelemetryInterval":
        try:
            INTERVAL = int(payload)
            # Build and send the acknowledgment.
            device_method_return_value.response = "{ \"Response\": \"Executed direct method %s\" }" % method_name
            device_method_return_value.status = 200
        except ValueError:
            # Build and send an error response.
            device_method_return_value.response = "{ \"Response\": \"Invalid parameter\" }"
            device_method_return_value.status = 400
    else:
        # Build and send an error response.
        device_method_return_value.response = "{ \"Response\": \"Direct method not defined: %s\" }" % method_name
        device_method_return_value.status = 404
    return device_method_return_value


def iothub_client_connector():

    try:
        client = iothub_client_init()
        print ( "Successfully connected with the IoT Hub" )
        #client.set_device_method_callback(device_method_callback, None)
        
        while True:
            # to be changed later when the streaming comes from sensor data generator
            
            #to obtain random values from the list above
            now = datetime.datetime.now()
            datetimeval = int(datetime.datetime.timestamp(now)) * 1000
            Randomasset = random.choice(AssetList)
            #print(Randomasset)
            Randomsensor =random.choice(sensorlist)
            #print(Randomsensor)
            Radnomsensorval =sensorval_dict.get(Randomsensor)
            #print("The value for the sensor "+Randomsensor+" is "+ str(Radnomsensorval))
            senseval = Radnomsensorval + (random.random() * 15)
            
            #formation of the string
            SampleString = Randomasset+","+Randomsensor+","+str(datetimeval)+","+str(senseval)+",Control Analog"
            
            #breaking of the string
            devicereading=sensorDataParser(SampleString)
            print("Able to generate the message "+ devicereading)
            
            #sending the string to IoTHub
            message=IoTHubMessage(devicereading)
            
            #Check for success
            client.send_event_async(message, send_confirmation_callback, None)
            time.sleep(1)
            
    except IoTHubError as iothub_error:
        print ( "Unexpected error %s from IoTHub" % iothub_error )
        return
    except KeyboardInterrupt:
        print ( "IoTHubClient sample stopped" )

    except IoTHubError as iothub_error:
        print ( "Unexpected error %s from IoTHub" % iothub_error )
        return
    except KeyboardInterrupt:
        print ( "IoTHubClient sample stopped" )
                 


Step 3 - Configure Event Hub and add it in the message routing

a) Search for Event hub and click on Add, you will be taken to the screenshot below.
Choose a name, Enable Kafka (Please note this will be available in Standard tier and above. It will not be available in Basic Version)


                 


              


b) Once you create you should be able to see the image below


                      

                   


c) Click on the Event hub and then you can see the details of the event hub. Click on Add Event hub and provide the name, also choose the number of partition and the number of days for which the message should be retained.






Step 4 - Configure Stream Analytics with Event hub as an input

a) Search for Stream Analytics Jobs service in Azure portal and click on Add, you should be looking at a screen as per the screenshot below -

                  



b) Once this is done configure the Event hub as the input of data. You will have other option or Iot Hub but in this case we will be using Event hub as an input.


                                 


                      
    You need to configure the below -
    Input Alias: The name you want to provide for the input.
    You can choose the option: "Select Event Hub from your Subscriptions"  - This will allow you to choose the Event hub which we have created in the previous step.
    Subscription: Choose the subscription under which the Event hub is created.
    Event Hub Namespace: Choose the event hub namespace from the dropdown.
    Event Hub Name: Choose the option of "Use Existing" and choose the name of the event hub you have created in Step 3.
    Event Hub Policy Name: I have choose RootManageShared
    Event Hub Policy Key: This should get autopopulated.
    Event Hub Consumer Group: I will be leaving this blank so that it uses the $Default consumer group of the Event Hub.
    Event Serialization Format: I will be choosing JSON as I have formatted the data as a JSON in Step 2.
    Encoding: I will leave it as default which is UTF8
    Event Compression Type: None (since it is just an example, but you can use gzip or deflate option in case you are compressing large streaming data)
    

c)  Once this is done, we will need to configure the output and subsequently the query to obtain the data from Event hub and store it in SQL Server database.


Step 5 - Configure SQL Server
a) Search for SQL Server databases and then click on +Add which will take you to a screen as shown in the image below:
The first tab will be Basic:
Subscription: Choose the subscription that you want the database to be created.
Resource Group: Choose the Resource group

Database Details:
Database Name: Enter the name of the database
Server: If you do not have SQL Server resource, then click on create new and you should see in the section below:
Once the server name is added, then lets continue to provision Azure SQL database
Want to use SQL elastic pool: Yes (if you feel you will have multiple databases and you want to manage all of them within certain costs.)
Compute + storage: General Purpose

                 




Adding new Server:
Servername: Provide the name of the server of your choice. Please note the .database.windows.net will be suffixed to the name you provide.
Server Admin Login: Provide the login username of your choice
Password: <Strong password>
Confirm Password: <confirm your Strong password>
Location: Which region your database should exist, please ensure it is in the same region else there will be cost for transfer for data across regions.
Allow Azure Services to access server: Check this as we require azure services to access this database.


                 






The second tab is Additional settings:
You can leave everything as default.
Use Exisiting data: None (if you want to start fresh, if you have an existing data and you want to start with that then you can choose backup.)
Database Collation: You can leave it as default unless you have a specific collation type which you use in your PoC or organisation.

                 



Once this is done click on Review + Create and then click on Create.

Open Management Studio v17 and above to connect to clouddatabase with the servername, username and password as provided by you.

Once you are sucessfully able to open the database, create a table using the script below. This is the table which will hold the streaming data from the devices in this PoC.


SET ANSI_NULLS ON
GO

SET QUOTED_IDENTIFIER ON
GO

CREATE TABLE [dbo].[DemoIot](
    [DeviceID] [nvarchar](50) NULL,
    [Parameter] [nvarchar](50) NULL,
    [Datetimestamp] [datetime] NULL,
    [val] [float] NULL,
    [ROWID] [int] IDENTITY(1,1) NOT NULL,
 CONSTRAINT [PK_DemoIot_1] PRIMARY KEY CLUSTERED
(
    [ROWID] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY]
GO

SET ANSI_NULLS ON
GO

SET QUOTED_IDENTIFIER ON
GO

CREATE TABLE [dbo].[Fact_ExceptionRecords](
    [DeviceID] [nvarchar](100) NULL,
    [Date] [datetime] NULL,
    [Type] [nvarchar](100) NULL,
    [Description] [nvarchar](300) NULL
) ON [PRIMARY]
GO



Step 6 - Configure Service Bus

a) Search for Service Bus service in azure portal and click on +Add, once you do this you will be able to see the screen as per the image below.

Provide the below details:
Name: The name of the service bus.
Pricing Tier: I will be choosing Basic
Subscription: Choose the subscription under which you want the resource to be present.
Resource Group: Choose the resource group under which you want the resource to be present or create a new one.
Location: Choose the region in which the service bus service to be present. Please ensure that the region remains the same as other services you have created, if not it can have cost associated to it because of the transfer of data.


                  





b) Create a queue in the service bus:
click on the service bus name and you should be able to see the screen as per the screenshot below.

Name: The name of the queue.
Max Queue Size: leave it with the default of 1 GB if you are using this for learning purpose, else modify it according to the volume of data that you are expecting.
Message Time to Live: This determines the time frame for which the messages will stay in the queue.
Lock Duration: The duration for which the message is locked so that only the one reciever has access to the data and once the time limit is reached, it will release the lock and the data will be available for other recievers to lock.
Enable Duplicate Detection: This will check if the same message is present in the queue and will not allow the message to be added if it feels it is duplicate.
Enable Sessions: This ensures the first in first out policy for the data as sessions ensure ordering of the messages in the queue.


             



Step 7 - Configure SQL Server and Service Bus as two outputs for Stream Analytics
Go to stream analytics which you have created in Step 4 and click on the resource, then you should be able to see the Outputs. Click on this and you will be able to choose the list of all resources which you can choose as outputs. The various options when this article has been written are -

    1) Event Hub
    2) SQL Database
    3) Table Storage
    4) Service Bus Topic
    5) Service Bus Queue
    6) Cosmos DB
    7) Power BI
    8) Data Lake Store Gen 1
    9) Azure Function

For this post, let us consider SQL Server Database and Service Bus which have been created in Step 5 and 6 respectively.

a) Configure SQL Database as the output.
On click of SQL Server database, you will be shown a configuration screen as per the screenshot below -


                      



Output Alias: The name you want to provide for the SQL Database output in stream analytics jobs
"Select SQL Database from your Subscriptions": Since you have already created the SQL Server database in Step 5.
Subscription: Choose the subscription under which you have created the SQL Server database.
Database: The name of the database which you have created
Username: The username to access the database
Password: The password to access the database
Table: The table in the database for which you have executed the script for.
Merge all input partitions into a single writer: Default, you can leave it as it is.
Max Batch Count: 10000 :You can leave it as default unless you see there are going to more records.

b) Configure Service Bus as the output -
For Service bus, there are two options p-
Service Bus Topic
Service Bus Queue
for this post, I will consider Service Bus Queue.

Below are config details for the service bus as output -
Output Alias: The name of the output for Service Bus Queue in stream analytics jobs.
Select queue from your Subscription: Use this option to ensure that you do not end up creating new service bus queue as we have already created this in Step 6.
Subscription: The name of the subscription you have used for the subscription of the service bus.
Service Bus Namespace: The namespace of the service bus.
Queue Name: Use Exisiting : This is to ensure you use the service bus queue which you have already created.
Queue Policy Name: RootManageSharedAccessKey
Queue Policy Key: This will be autopopulated and is not editable.
Property Columns: If you want some custom values to go to service bus, you can provide the names as a comma separated values. This is not required now and hence we will keep it blank.
Event Serialization Format: JSON
Encoding: UTF8
Format: Line Separated


                                  



c) Once this is done, go the stream analytics jobs, you will see a query window. Click on edit query and add a sample code below -

SELECT
       cast([DeviceID] as nvarchar(MAX)) as DeviceID
      ,cast([Parameter] as nvarchar(MAX)) as Parameter
      ,cast([Datetimestamp] as datetime) as Datetimestamp
      ,cast([val] as float) as val
INTO
    [SQLOutput]
FROM
    [eventhubinput]


SELECT
       cast([DeviceID] as nvarchar(MAX)) as DeviceID
      ,cast([Parameter] as nvarchar(MAX)) as Parameter
      ,cast([Datetimestamp] as datetime) as Datetimestamp
      ,cast([val] as float) as val
INTO
    [eventhubinput]
FROM
    [safracpump]
WHERE cast([val] as float) > 246.0

                                  



As you can see there are two outputs and in one single query. To test if it works fine you can upload sample data and then check. To do this,

a) Add the below records into a text file
{"DeviceID" : "DEVICE1", "Parameter" : "Engine Voltage", "Datetimestamp" : "2019-06-19 11:53:26.629000", "Val" : "233.825983439"}
{"DeviceID" : "DEVICE1", "Parameter" : "Engine Voltage", "Datetimestamp" : "2019-06-19 11:53:26.629000", "Val" : "233.825983439"}
{"DeviceID" : "DEVICE1", "Parameter" : "Engine Voltage", "Datetimestamp" : "2019-06-19 11:53:26.629000", "Val" : "233.825983439"}
{"DeviceID" : "DEVICE1", "Parameter" : "Engine Voltage", "Datetimestamp" : "2019-06-19 11:53:26.629000", "Val" : "233.825983439"}
{"DeviceID" : "DEVICE1", "Parameter" : "Engine Voltage", "Datetimestamp" : "2019-06-19 11:53:26.629000", "Val" : "253.825983439"}
{"DeviceID" : "DEVICE1", "Parameter" : "Engine Voltage", "Datetimestamp" : "2019-06-19 11:53:26.629000", "Val" : "233.825983439"}
{"DeviceID" : "DEVICE1", "Parameter" : "Engine Voltage", "Datetimestamp" : "2019-06-19 11:53:26.629000", "Val" : "253.825983439"}
{"DeviceID" : "DEVICE1", "Parameter" : "Engine Voltage", "Datetimestamp" : "2019-06-19 11:53:26.629000", "Val" : "233.825983439"}
{"DeviceID" : "DEVICE1", "Parameter" : "Engine Voltage", "Datetimestamp" : "2019-06-19 11:53:26.629000", "Val" : "253.825983439"}


b) Once it is done click on the input and choose "Upload sample data from file"


                                     



                                                                       




c) Once it is done click on test and you should be able to see two output tabs one for SQL and other for Service bus as shown in the image below -

                                             

                                          
                                      


Step 8 - Logic Apps to check the data in SQL Server table and create a record if the value is greater than treshold

a) Creation of Logic apps:
Search for Logic apps service in azure and click you should be able to see the image below -
                            





Name: The name of the logic app
Subscription: The name of the subscription you want
Resource Group: Use Exisiting
Location: The region in which you want the resource to be present.
Log Analytics: In case you want to monitor the workflows you can have it on, else it can be off.

Logic to be implemented -
Once a record is inserted into a table in the Azure SQL Database
Check if the value is greater than the treshold value
If the condition is true then insert the value into a new table.


b) Creation of flow:
Click on the Logic app designer and you should be taken to a screen where in you should be able to add the flows and implement your logic.

Once you do this you should be able to see the list of all inputs. We will choose SQL Server as an input.
              





                    

Once you select SQL Database it shows you two triggers for starting the flow. (Please note you can only choose trigger as a first step, you will not be allowed to use action)
The two options being -
1) When an item is created : This means the table should have a column with an auto increment feature. If the table does not have this, then the table will not appear in the dropdown.
2) When an item is modified : This means the table should have a column with rowversion so that it can understand if there is any modification to the record which will initiate the work flow.


                                                             




In our case the trigger will be when a new record is inserted into the table as there is no scenario where in we will be updating a record. Once you choose this you will be able to see the below screens -
Change the interval from 3 minutes to 1 second, This denotes the interval at which the service will check for the events.


                                                            




The connection in the snapshot is already present, if you want to add it click on new connection in your case and you should be able to see the screen as shown in the snapshot below -


                                                          




Enter the credentials for the database and choose connect via On Premise Gateway if your database is not in Azure environment or is a virtual machine inside Azure.
                                                   
Once it is done choose the name of the table from the dropdown. If you want to add any parameter which will help in filter / ordering / if you want to obtain specific columns from the table you can mention.

                            


                                                      

Once it is done, click on New Step and you will be provided with lot of options and suggestions. Click on Condition based on the logic which we are planning to implement.


                                                       



On click of this, you should be able to see a condition box comes up. Leave the condition to be And by default. Place the cursor on the "Choose a value"  you should be able to see a pop up comes up with dynamic content, it would have the list of all the columns which you have in the table. Choose the column "val" with "is greater than" in this case and enter a number of 246.

                                                       
                                         



Go to true condition below and click on "add an action", choose SQL SErver and you will be able to see a list of options. Choose "Insert row", you will be able to see a drop down populated with tables based on the database connection string. Choose the table "Fact_ExceptionRecords".

                              

                                                            

                                                





In the add new parameter choose the columns in the table as shown in the snashot below.


                                                       



Once you choose the columns, click on the columns one by one and you should be able to see the input columns in the dynamic content. Use the appropriate column names and for description few words "This has been flagged because the val is " Val (column) as shown in the image below.

 
                                                   


                            

Once this is done, click on save.

                                           


Once you start running you should be able to see the records in Runs history as shown in the snapshot below. It should appear as succeeded. If there is any error it will return failed. If the condition is not met it will return it as skipped.


                                                



Now if you run the code which generates the data you should be able to see the spikes in the messages on IoT Hub, Even Hub, Stream Analytics. You should be able to see the data in both the tables of SQL Server database.
Do let me know what you think about this blog, if it has helped you or if you feel anything needs to be improved.

I will try to add more services of azure in future blogs.




No comments:

Post a Comment