Traffic Data Monitoring Using IoT, Kafka and Spark Streaming
Posted by: cms_admin
skip_previous fast_rewind skip_next fast_forward
Keywords:
Tóm tắt ý chính:
 1. Giới thiệu - Xe được kết nối IoT và Spark Spark
 2. Công nghệ và Công cụ cho ứng dụng này
 3. Ứng dụng tạo các sự kiện Dữ liệu IoT bằng Apache Kafka
 4. Ứng dụng xử lý luồng dữ liệu IoT bằng Spark Streaming.Process và chuyển đổi các sự kiện IoTData thành Tổng số lưu lượng truy cập, số lượng lưu lượng truy cập cửa sổ và chi tiết lưu lượng truy cập POI
 5. Xây dựng Bảng điều khiển giám sát dữ liệu lưu lượng IoT bằng Spring Boot, Web socket, jQuery, Sockjs và Bootstrap
 
Tạm dịch sơ từ bài viết gốc: https://www.infoq.com/articles/traffic-data-monitoring-iot-kafka-and-spark-streaming

IoT và phương tiện kết nối
Internet of Things (IoT) là một công nghệ đột phá đang nổi lên và đang trở thành một chủ đề được quan tâm ngày càng tăng giữa những người khổng lồ công nghệ và cộng đồng doanh nghiệp. Các thành phần IoT là các thiết bị được kết nối với nhau qua mạng, được nhúng với các cảm biến, phần mềm và ứng dụng thông minh để chúng có thể thu thập và trao đổi dữ liệu với nhau hoặc với các trung tâm dữ liệu / đám mây. Dữ liệu được tạo bởi các thiết bị IoT có khối lượng lớn và ngẫu nhiên về bản chất và cần được phân tích bằng cách sử dụng công cụ phân tích dữ liệu lớn để trích xuất thông tin quan trọng hoặc để hiểu các mẫu hành vi của người dùng.

Một trong những lĩnh vực mà IoT đang mở đường là các phương tiện được kết nối. Theo Gartner, đến năm 2020, sẽ có một phần tư tỷ xe được kết nối trên đường, được tự động hóa hơn, cung cấp các dịch vụ mới trên xe như hệ thống định vị nâng cao, cập nhật giao thông thời gian thực, cảnh báo thời tiết và tích hợp với bảng điều khiển giám sát và điện thoại thông minh. Để xử lý dữ liệu được tạo bởi các phương tiện được kết nối IoT, dữ liệu được truyền đến các bộ xử lý dữ liệu lớn nằm trong đám mây hoặc trung tâm dữ liệu. Một phương tiện được kết nối IoT cung cấp thông tin thời gian thực của phương tiện như tốc độ, mức nhiên liệu, tên tuyến đường, vĩ độ và kinh độ của xe, v.v. Thông tin này có thể được phân tích và dữ liệu có thể được trích xuất và chuyển đổi sang kết quả cuối cùng có thể được gửi lại cho xe hoặc đến một bảng điều khiển giám sát. Ví dụ: sử dụng thông tin được thu thập cho các phương tiện khác nhau, chúng tôi có thể phân tích và giám sát lưu lượng trên một tuyến đường cụ thể. Trong bài viết này, chúng tôi sẽ sử dụng Apache Spark để phân tích và xử lý dữ liệu Xe cộ được kết nối IoT và gửi dữ liệu đã xử lý đến bảng điều khiển giám sát giao thông thời gian thực.

Spark là gì
Spark là một hệ thống xử lý dữ liệu lớn nguồn mở, nhanh và có mục đích chung cho điện toán cụm. Spark có thể chạy ở chế độ độc lập, Hadoop YARN hoặc Apache Mesos. Để phát triển ứng dụng, Spark cung cấp API phong phú về Scala, Java và Python. Ngoài công cụ lõi Spark, Spark còn có một số thư viện cung cấp API cho tính toán song song.

 • Spark SQL- API để chạy SQL như các truy vấn trên tập dữ liệu.
 • Spark MLlib - Thư viện máy học.
 • Spark GraphX ​​- API cho tính toán đồ thị.
 • Spark Streaming - API để xử lý dữ liệu phát trực tuyến theo thời gian thực.

Máy trong đó ứng dụng spark (Spark Context) chạy được gọi là Driver. Trình điều khiển (Driver) thực hiện các hoạt động song song khác nhau trên các nút hoặc cụm worker.
Spark sử dụng khái niệm Bộ dữ liệu phân tán đàn hồi (RDD), đại diện cho một bộ collection các đối tượng chỉ đọc được phân vùng trên một tập hợp các máy có thể được xây dựng lại nếu mất phân vùng. Để biết thêm chi tiết, tham khảo bài viết "Xử lý dữ liệu lớn với Spark Spark - Phần 1: Giới thiệu".

Spark Streaming
Spark Streaming là một phần mở rộng của Spark Core cung cấp khả năng xử lý lỗi dữ liệu phát trực tiếp. Spark streaming chia luồng đến thành các lô vi của các khoảng thời gian xác định và trả về Dux. Dux đại diện cho luồng dữ liệu liên tục được nhập từ các nguồn như Kafka, Flume, Twitter hoặc HDFS. Các luồng được xử lý và đẩy ra hệ thống tập tin, cơ sở dữ liệu và bảng điều khiển trực tiếp. Tham khảo bài viết Xử lý dữ liệu lớn với Apache Spark - Phần 3: Spark Streaming trực tuyến để biết thêm chi tiết.

Kafka
Apache Kafka là hệ thống nhắn tin phân tán thông lượng cao, trong đó nhiều nhà sản xuất gửi dữ liệu đến cụm Kafka và lần lượt phục vụ chúng cho người tiêu dùng. Nó là một dịch vụ nhật ký cam kết phân tán, phân vùng, nhân rộng. Vui lòng tham khảo bài viết Apache Kafka: Hệ thống nhắn tin phân tán thế hệ tiếp theo để biết thêm chi tiết.

Ứng dụng giám sát dữ liệu giao thông
Ứng dụng chúng tôi sẽ phát triển là một ứng dụng theo dõi và xử lý dữ liệu IoT bằng Spark Streaming. Ứng dụng này sẽ xử lý dữ liệu IoT thời gian thực được gửi bởi các phương tiện được kết nối và sử dụng dữ liệu đó để giám sát lưu lượng trên các tuyến đường khác nhau. Chúng tôi sẽ chia ứng dụng này thành ba mô-đun sau. Các mô-đun này là các ứng dụng Maven độc lập được viết bằng Java và có thể được xây dựng và chạy độc lập.

IoT Producer:
Các phương tiện được kết nối tạo ra các thông điệp IoT được thu thập bởi một nhà môi giới tin nhắn và được gửi đến ứng dụng phát trực tuyến để xử lý. Trong ứng dụng mẫu của chúng tôi, nhà sản xuất dữ liệu IoT là một ứng dụng giả lập cho các phương tiện được kết nối và sử dụng Apache Kafka để tạo các sự kiện dữ liệu IoT.

Bộ xử lý dữ liệu IoT:
Đây là một ứng dụng Spark Streaming tiêu thụ các luồng dữ liệu IoT và xử lý chúng để phân tích dữ liệu lưu lượng. Bộ xử lý dữ liệu IoT cung cấp fol
[image]


Technologies and Tools

Following table shows the technologies and tools used in the traffic data monitoring application.

Download and install the tools and set the environment variables as given in the installation document of respective tools. Below are the few setups we need for this application.

 • Create a topic called “iot-data-event” for this application using below Kafka command.
 • bin/kafka-topics.sh --create --zookeeper localhost:2181 -
  -replication-factor 1 --partitions 1 --topic iot-data-
  event

 • Create tables in Cassandra database using below commands.
 • CREATE KEYSPACE IF NOT EXISTS TrafficKeySpace WITH 
  replication = {'class':'SimpleStrategy', 
  'replication_factor':1};
  
  CREATE TABLE TrafficKeySpace.Total_Traffic (routeId text, 
  vehicleType text, totalCount bigint, timeStamp 
  timestamp,recordDate text,PRIMARY KEY 
  (routeId,recordDate,vehicleType));
  
  CREATE TABLE TrafficKeySpace.Window_Traffic (routeId 
  text, vehicleType text, totalCount bigint, timeStamp 
  timestamp,recordDate text,PRIMARY KEY 
  (routeId,recordDate,vehicleType));
  
  CREATE TABLE TrafficKeySpace.poi_traffic(vehicleid text , 
  vehicletype text , distance bigint, timeStamp 
  timestamp,PRIMARY KEY (vehicleid));
  


IoT Data Producer

IoT devices or connected vehicles generate huge amount of data which are extremely random and time-sensitive. These data are captured by messaging system filtered, routed and ingested to stream processors. In this article we are using Kafka as IoT data producer. This article will show the relevant portion of java classes or configuration files. For complete files, please check GitHub project.

Let’s start with Maven pom.xml file.

 <dependency>
 	 <groupId>org.apache.kafka</groupId>
	 <artifactId>kafka_2.10</artifactId>
	 <version>0.8.1</version>
 </dependency>
 <dependency>
	 <groupId>com.fasterxml.jackson.core</groupId>
	 <artifactId>jackson-core</artifactId>
	 <version>2.6.6</version>
 </dependency>
 <dependency>
	 <groupId>com.fasterxml.jackson.core</groupId>
	 <artifactId>jackson-databind</artifactId>
	 <version>2.6.6</version>
 </dependency>
 <dependency>
	 <groupId>com.fasterxml.jackson.core</groupId>
	 <artifactId>jackson-annotations</artifactId>
	 <version>2.6.6</version>
 </dependency>

We use IoTData class to define the attributes of connected vehicle.

public class IoTData implements Serializable{
	private String vehicleId;
	private String vehicleType;
	private String routeId;
	private String longitude;
	private String latitude;
}
IoTDataProducer application will produce IoT data in JSON format. We need to write a custom Kafka serializer class which will serialize IoTData objects. For this we need to implement toBytes method of kafka.serializer.Encoder interface.

public class IoTDataEncoder implements Encoder<IoTData> {
	public byte[] toBytes(IoTData iotEvent) {
		try {
			String msg = 
objectMapper.writeValueAsString(iotEvent);			    
return msg.getBytes();
		} catch (JsonProcessingException e) {}	
  }
}

Now let’s write IoTDataProducer class. We will keep the configuration details of ZooKeeper and Kafka in iot-kafka.properties which is inside resources folder. This folder also contains log4j.properties file. Remember we have created iot-data-event topic while setting up environment for Kafka. IoTDataProducerwill produce message on this topic. We will set the value for serializer.classproperty for Kafka as IoTDataEncoder.

public class IoTDataProducer {
  properties.put("zookeeper.connect", zookeeper);
  properties.put("metadata.broker.list", brokerList);
  properties.put("request.required.acks", "1");
  properties.put("serializer.class",
    "com.iot.app.kafka.util.IoTDataEncoder");
  Producer<String, IoTData> producer = new Producer<String, 
    IoTData>(new ProducerConfig(properties));
}
In this application we will have to define the Routes and Vehicle types. For sake of simplicity we are using three routes “Route-37”, “Route-43” and “Route-82”. We have five different types of IoT Connected Vehicles which are "Large Truck", "Small Truck", "Private Car", "Bus" and "Taxi". Connected Vehicles send events in regular intervals while moving on a particular route. For this article we assume that each vehicle will send five events in random order with delay of 1 to 3 seconds. We are using Java Random class to generate values for attributes of Connected Vehicles.

private void generateIoTEvent(Producer<String, IoTData>  
    producer, String topic) {
  IoTData event = new IoTData(vehicleId, vehicleType, 
    routeId, latitude, longitude, timestamp,  
    speed,fuelLevel);
  KeyedMessage<String, IoTData> data = new 
    KeyedMessage<String, IoTData>(topic, event);
  producer.send(data);
}
 IoT Producer application is ready. We can build and run this application using below commands.

mvn package

mvn exec:java -
Dexec.mainClass="com.iot.app.kafka.producer.IoTDataProducer"

or

java -jar iot-kafka-producer-1.0.0.jar
You will see the IoT data events as shown in below snapshot.

Figure 2. IoT Data events


IoT Data Processor

In this section, we will develop a Spark Streaming application which will consume real time IoT Data messages and process them. This application is available at GitHub project.

Below is the pom.xml file for IoT Data Processor application.

 <dependency>
	 <groupId>org.apache.spark</groupId>
	 <artifactId>spark-core_2.10</artifactId>
	 <version>1.6.2</version>
 </dependency>
 <dependency>
	 <groupId>org.apache.spark</groupId>
	 <artifactId>spark-streaming_2.10</artifactId>
	 <version>1.6.2</version>
 </dependency>
 <dependency>
	 <groupId>org.apache.spark</groupId>
	 <artifactId>spark-streaming-
kafka_2.10</artifactId>
	 <version>1.6.2</version>
 </dependency>
 <dependency>
	 <groupId>org.apache.spark</groupId>
	 <artifactId>spark-sql_2.10</artifactId>
	 <version>1.6.2</version>
 </dependency>
 <dependency>
	 <groupId>com.datastax.spark</groupId>
	 <artifactId>spark-cassandra-
connector_2.10</artifactId>
	 <version>1.6.0</version>
 </dependency>

We need to write custom deserializer class which will deserialize IoTData JSON String to IoTData object. We need to implement fromBytes method for kafka.serializer.Decoder interface.

public class IoTDataDecoder implements Decoder<IoTData> {	
	public IoTData fromBytes(byte[] bytes) {
		try {
			return objectMapper.readValue(bytes, 
IoTData.class);
		} catch (Exception e) {}
  }
}

The resources folder will have iot-spark.properties file which has configuration key-value pair for Kafka, Spark and Cassandra. We will write IoTDataProcessor class using Spark APIs. We will start with creating SparkConf object by setting the Spark and Cassandra properties.

SparkConf conf = new SparkConf()
.setAppName(prop.getProperty("com.iot.app.spark.app.name"))
.set("spark.cassandra.connection.host",prop.getProperty("com.
iot.app.cassandra.host"))
Our application will collect streaming data in batch of five seconds. Create Java Streaming Context using SparkConf object and Duration value of five seconds. Set checkpoint directory in Java Streaming context. Read IoT date stream using KafkaUtils.createDirectStream API.

JavaStreamingContext jssc = new 
JavaStreamingContext(conf,Durations.seconds(5));
jssc.checkpoint(prop.getProperty("com.iot.app.spark.checkpoint.dir"));
JavaPairInputDStream<String, IoTData> directKafkaStream = 
KafkaUtils.createDirectStream(
			    jssc,
			    String.class,
			    IoTData.class,
			    StringDecoder.class,
			    IoTDataDecoder.class,
			    kafkaParams,
			    topicsSet
			  );

We will do first transformation using map operation to get the DStream of IotDataobjects. Next we want to have a pair DStream in which key will be vehicleId and value will be IoTData object and for this we will call mapToPair transformation. Current DStream can have more than one event data for same vehicleId. We want unique Vehicles per incoming batch and for that we will call reduceByKey transformation.

JavaDStream<IoTData> nonFilteredIotDataStream = 
  directKafkaStream.map(tuple -> tuple._2());
JavaPairDStream<String,IoTData> iotDataPairStream = 
  nonFilteredIotDataStream.mapToPair(iot -> new 
Tuple2<String,IoTData>(iot.getVehicleId(),iot)).reduceByKey((
  a, b) -> a);
In order to calculate the count of vehicles over the time we need to keep record of vehicles which have already been processed in previous Dstreams. To achieve this, we are going to use Spark’s stateful operation. We will use mapWithState operation available on DStreams of key-value pairs. mapWithState operation uses StateSpec.function for maintaining the state data for key. Below processedVehicleFunc is the StateSpec.function for this application.

private static final Function3<String, Optional<IoTData>, 
    State<Boolean>, Tuple2<IoTData,Boolean>> 
    processedVehicleFunc = (String, iot, state) -> {
  Tuple2<IoTData,Boolean> vehicle = new 
    Tuple2<>(iot.get(),false);
		if(state.exists()){
			vehicle = new Tuple2<>(iot.get(),true);
		}else{
			state.update(Boolean.TRUE);
		}			
		return vehicle;
	};
For stateful operation Spark stores the RDD state in on machines across the cluster. For long running application there could be large amount of data which will be stored in memory on worker machines. Based on type of application you can decide for how long you want to maintain state for a key. In this article we will keep the vehiclieId in memory for one hour. In below code we are applying a map operation and then filter operation to get the vehicles which are not processed.

JavaMapWithStateDStream<String, IoTData, Boolean, 
  Tuple2<IoTData,Boolean>> iotDStreamWithStatePairs = 
    iotDataPairStream.mapWithState(
    StateSpec.function(processedVehicleFunc)
    .timeout(Durations.seconds(3600)));

JavaDStream<Tuple2<IoTData,Boolean>> filteredIotDStreams = 
  iotDStreamWithStatePairs.map(tuple2 -> 
  tuple2).filter(tuple -> tuple._2.equals(Boolean.FALSE));

Total Traffic Processing

Total traffic data processing from IoT Data Stream using Spark is illustrated in Figure 3 below.

(Click on the image to enlarge it)

Figure 3. Total Traffic Data Processing Using Spark

Below is the transformation to process total traffic count for different types of vehicles on each route. We identify unique vehicle by routeId and vehicleType so we have created a AggregateKey class which has these two attributes. AggregateKey object will be key in pairDstream. Value for this pairDstream will be the count of vehicles. We use mapToPair transformation for each count and reduceByKey to combine the count of same AggregateKey in pair. For total vehicle count or traffic data we need to maintain state of AggregateKey.

JavaPairDStream<AggregateKey, Long> countDStreamPair = 
filteredIotDataStream
	.mapToPair(iot -> new Tuple2<>(new 
AggregateKey(iot.getRouteId(), iot.getVehicleType()), 1L))
	.reduceByKey((a, b) -> a + b);
Below function maintains running sum for AggregateKey over the time.

    long totalSum = currentSum.or(0L) + (state.exists() ? 
state.get() : 0);
    Tuple2<AggregateKey, Long> total = new Tuple2<>(key, 
totalSum);
    state.update(totalSum);
With above series of transformations, we get routeId, vehicleType and total count, which is the total traffic data. We will store this data in Cassandra database so that this can be sent to Dashboard.

We need to transform the JavaDstream from previous transformation to JavaDStream<TotalTrafficData> so that this can be persisted in Cassandra database. We write a new function for this transformation.

private static final Function<Tuple2<AggregateKey, Long>, 
TotalTrafficData> totalTrafficDataFunc = (tuple -> {
		TotalTrafficData trafficData = new 
TotalTrafficData();
		trafficData.setRouteId(tuple._1().getRouteId());

	trafficData.setVehicleType(tuple._1().getVehicleType());
		trafficData.setTotalCount(tuple._2());
		trafficData.setTimeStamp(new Date());
		trafficData.setRecordDate(new 
SimpleDateFormat("yyyy-MM-dd").format(new Date()));
		return trafficData;
	});
For saving data in Cassandra database we are using datastax’s spark Cassandra connector library. This library provides API to save DStream or RDD to Cassandra. Provide column mapping and call saveToCassandra() method.

Map<String, String> columnNameMappings = new HashMap<String, 
String>();
columnNameMappings.put("routeId", "routeid");
columnNameMappings.put("vehicleType", "vehicletype");
columnNameMappings.put("totalCount", "totalcount");
columnNameMappings.put("timeStamp", "timestamp");
columnNameMappings.put("recordDate", "recorddate");

javaFunctions(trafficDStream).writerBuilder("traffickeyspace"
, "total_traffic",
CassandraJavaUtil.mapToRow(TotalTrafficData.class, 
columnNameMappings)).saveToCassandra();

Window Traffic Processing

The second objective of IoT Data Processor is to calculate the traffic details for last 30 second Window.  We will use Spark’s Window based API for this processing. We will use reduceByKeyAndWindow method with the window duration of 30 seconds and slide interval of 10 seconds. We will not maintain any state for count here and will send the transformed data to Cassandra database. Transform the Dstream to WindowTrafficData entity class of Window_Traffic table and save to Cassandra DB like we did in total traffic processing.

JavaPairDStream<AggregateKey, Long> countDStreamPair = 
filteredIotDataStream
				.mapToPair(iot -> new Tuple2<>(new 
AggregateKey(iot.getRouteId(), iot.getVehicleType()), 1L))
				.reduceByKeyAndWindow((a, b) -> a + b, 
Durations.seconds(30), Durations.seconds(10));

POI (Point-of-Interest) Traffic Processing

Third and final objective of IoT Data Processor is to process information regarding the vehicles which are in the radius of defined point-of-interest (POI). Please see Figure 4 below.

Figure 4. Vehicles within POI radius

If you checked the IoTData messages generated by IoT Data Producer, it has latitude and longitude of the vehicle position. We will use this coordinates and compare with coordinates for POI. In this application we will monitor Route-37 and vehicle type Truck (Small Truck or Large Truck) which are in the radius (30 km) of POI. Let’s define coordinates and radius for POI. Set these values in broadcast variable.

POIData poiData = new POIData();
poiData.setLatitude(33.877495);
poiData.setLongitude(-95.50238);
poiData.setRadius(30);
 
Broadcast<Tuple3<POIData, String, String>> broadcastPOIValues 
= jssc.sparkContext().broadcast(new Tuple3<>(poiData,"Route-
37","Truck"));

We have to calculate distance between a vehicle and POI using their latitude and longitude positions. In this application we use "Haversine Formula” to calculate the great-circle distance between two points on earth. Please visit this website if you want to check the distance using this formula. For real-time applications you might want to use Google Map API or Open Street Map API. For our application, "Haversine" formula is sufficient and we will write GeoDistanceCalculator class and getDistance method in it.

public static double getDistance(double lat1, double lon1, 
double lat2, double lon2) {
	final int r = 6371;
	Double latDistance = Math.toRadians(lat2 - lat1);
	Double lonDistance = Math.toRadians(lon2 - lon1);
	Double a = Math.sin(latDistance / 2) * 
Math.sin(latDistance / 2) +  Math.cos(Math.toRadians(lat1))* 
Math.cos(Math.toRadians(lat2)) * Math.sin(lonDistance / 2) * 
Math.sin(lonDistance / 2);
	Double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - 
a));
	double distance = r * c;	
	return distance;
   }
For POI calculation we will use JavaDStream<IoTData> Dstream which we got after first transformation on data received from KafkaUtils. We need to apply filter transformation get Dstream of IotData in which vehicle’s routeId is equals to “Route-37”, vehicleId is like “Truck” and vehicle is in radius of POI. We need to keep this filtered IotData and POIData objects in pair and will use to transform it into entity object for (POITrafficData) Poi_Traffic table of Cassandra database.

JavaDStream<IoTData> iotDataStreamFiltered = 
nonFilteredIotDataStream
.filter(iot -> 
(iot.getRouteId().equals(broadcastPOIValues.value()._2()) && 
iot.getVehicleType().contains(broadcastPOIValues.value()._3()
) && 
GeoDistanceCalculator.isInPOIRadius(Double.valueOf(iot.getLat
itude()),
Double.valueOf(iot.getLongitude()), 
broadcastPOIValues.value()._1().getLatitude(),
broadcastPOIValues.value()._1().getLongitude(),
broadcastPOIValues.value()._1().getRadius())));

JavaPairDStream<IoTData, POIData> poiDStreamPair = 
iotDataStreamFiltered
.mapToPair(iot -> new Tuple2<>(iot, 
broadcastPOIValues.value()._1()));
Map the entity’s attributes with the table column names and call saveToCassandra() method. Please note here we are invoking withConstantTTL() method which will make this Dstream to be stored for specified time and then it will be deleted from the table. We are keeping it for two minutes.

javaFunctions(trafficDStream).writerBuilder("traffickeyspace"
, 
"poi_traffic",CassandraJavaUtil.mapToRow(POITrafficData.class
, columnNameMappings)).withConstantTTL(120)
.saveToCassandra();
All the three methods are completed and now we can invoke these methods from IotDataProcessor class and then we will start Java Streaming Context.

IoTTrafficDataProcessor iotTrafficProcessor = new 
IoTTrafficDataProcessor();
iotTrafficProcessor.processTotalTrafficData(filteredIotDataSt
ream);
iotTrafficProcessor.processWindowTrafficData(filteredIotDataS
tream);
iotTrafficProcessor.processPOIData(nonFilteredIotDataStream,b
roadcastPOIValues);
jssc.start();      
jssc.awaitTermination();
IoTDataProcessor application is ready. It’s time to build and run the application. Execute maven package command to generate jar file.

mvn package
You will get iot-spark-processor-1.0.0.jar file. In maven build we have included all the required spark libraries while assembling this jar. If you want, you can use <provided> scope for Spark related dependency in pom.xml and those libraries will not be bundled in final uber jar as they will be provided by Spark at runtime. Invoke spark-submit command like below.

spark-submit --class 
"com.iot.app.spark.processor.IoTDataProcessor” iot-spark-
processor-1.0.0.jar

IoT Data Dashboard

We will use Spring Boot for developing our dashboard application. Spring Boot provides spring data support for Cassandra database so it is easy to develop data access class and entity class. We are using spring-boot-dependencies 1.3.5 release and it supports Cassandra 2.X through library which uses DataStax Java Driver (2.0.X). We will start with pom.xml file. Code for Dashboard is available at GitHub project.

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-
websocket</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-
cassandra</artifactId>
</dependency>
We will create entity classes for all three tables “Total_Traffic”, “Window_Traffic” and “Poi_Traffic”. Create DAO interfaces for all three entities which will extend CassandraRepository. Below code snippet shows DAO class for TotalTrafficData entity. We will also add custom query (if required) for fetching data from table.

@Repository
public interface TotalTrafficDataRepository extends 
CassandraRepository<TotalTrafficData>{
@Query("SELECT * FROM traffickeyspace.total_traffic WHERE 
recorddate = ?0 ALLOW FILTERING")
	 Iterable<TotalTrafficData> findTrafficDataByDate(String 
date);	 
}
We will write CassandraConfig class which will connect to Cassandra cluster and get connection for database operations.

public class CassandraConfig extends 
AbstractCassandraConfiguration{
  @Bean
  public CassandraClusterFactoryBean cluster() {
    CassandraClusterFactoryBean cluster = new 
CassandraClusterFactoryBean();

cluster.setContactPoints(environment.getProperty("com.iot.app
.cassandra.host"));

cluster.setPort(Integer.parseInt(environment.getProperty("com
.iot.app.cassandra.port")));
    return cluster;
  }
 }
We want to refresh data on dashboard automatically in fixed interval. We will use Web socket to push updated data to UI.

public class WebSocketConfig extends 
AbstractWebSocketMessageBrokerConfigurer {
public void registerStompEndpoints(StompEndpointRegistry 
registry) {
    registry.addEndpoint("/stomp").withSockJS();
  }

public void configureMessageBroker(MessageBrokerRegistry 
config) {
    config.enableSimpleBroker("/topic");
  }
}
Next we write TrafficDataService class which will pull data from Cassandra tables using repository interfaces. This service class will then send this data over the Web Socket to the dashboard in every five seconds. This class uses Spring’s SimpMessagingTemplate to send Response object to dashboard.

private SimpMessagingTemplate template;
@Scheduled(fixedRate = 5000)
public void trigger() {
		Response = new Response();
		response.setTotalTraffic(totalTrafficList);
		response.setWindowTraffic(windowTrafficList);
		response.setPoiTraffic(poiTrafficList);
		this.template.convertAndSend("/topic/trafficData", 
response);
	}
Below is the IotDataDashboard class which is a Spring Boot application class and runs on port 8080.

public class IoTDataDashboard {
	 public static void main(String[] args) {
	    SpringApplication.run(IoTDataDashboard.class, 
args);
	  }
	}
IoT Data Dashboard UI page is a html page which is available at resources/static folder. We need to add jQuery, Sockjs and Stomp javascript libraries so it can receive messages on Web Socket.

  <scripttype="text/javascript"src="js/jquery-
1.12.4.min.js"></script>
	<scripttype="text/javascript"src="js/sockjs-
1.1.1.min.js"></script>
	<scripttype="text/javascript"src="js/stomp.min.js"></script>
Add code to receive data from Web Socket and parse the Response object as JSON data.

   var totalTrafficList = jQuery("#total_traffic");
   stompClient.connect({ }, function(frame) {
	stompClient.subscribe("/topic/trafficData", 
function(data) {
			var dataList = data.body;
			var resp=jQuery.parseJSON(dataList);
						totalTrafficList.html(t_tabl_start+totalOutput+t_tabl
_end)	
}

We want to display traffic data on dashboard using graphs and charts. For that we will use Chart.js library. Chart.js provides rich API for different types of charts. Please see the Chart.js documentation page for API details. We want that this dashboard to be responsive so it can be viewed on devices of different screen sizes or resolutions. We are adding bootstrap.js library and bootstrap.css for responsive web UI.

<scripttype="text/javascript"src="js/bootstrap.min.js"></script>
<scripttype="text/javascript"src="js/Chart.min.js"></script>
<head>
  <title>IoT Traffic Data Dashboard</title>
  <linkrel="stylesheet"type="text/css"href="css/bootstrap.min.css">
  <linkrel="stylesheet"type="text/css"href="css/style.css">
</head>
IoT Data Dashboard application is ready. We can build and run this application using below commands.

mvn package

mvn exec:java -Dexec.mainClass=" 
com.iot.app.springboot.dashboard.IoTDataDashboard"
Or

java -jar iot-springboot-dashboard-1.0.0.jar
Now open the browser and enter http://localhost:8080 . You will see dashboard displaying data in tables and charts. This dashboard is refreshing data in every five seconds. IoT Traffic Data Monitoring Dashboard looks like below image.

(Click on the image to enlarge it)

Figure 5. IoT Traffic Data Monitoring Dashboard


Summary

In this article we learned how real time IoT Data Events coming from Connected Vehicles can be ingested to Spark through Kafka. Using the Spark streaming API, we processed and analysed IoT data events and transformed them in to vehicle count for different types of vehicles on different routes. We performed series of stateless and stateful transformation using Spark streaming API on Dstreams and persisted them to Cassandra database tables. We developed responsive web traffic monitoring dashboard using Spring Boot, SockJs and Bootstrap which queries data from Cassandra database and pushes to UI using web socket. These applications are available inside maven aggregator project "iot-traffic-monitor" at GitHub location.


References

Advertiment