Anomaly Detection using Spark

One of the analytic use case for Data collection in Big Data is finding anomaly. It serves multiple purpose. Not only in terms of Security check in Network data but also as a data quality check.

In an idea scenario, you will know how many types of data are processed from a single source. But in our case, let’s say we don’t know how many different types of events are being processed in the data, or in Analytical terms we don’t know how many clusters are within the dataset.

First Step in any anomaly detection system would be to identify the no of clusters. Spark has clustering algorithm called K means. We can either implement K means algorithm or use the streaming K means if our system is going to be stream based system.

To find the right number of cluster we can use a method called elbow method(https://bl.ocks.org/rpgove/0060ff3b656618e9136b).

Below is the code to get the SSE value of each cluster size.

 

val rddVec = trainingDatafram.rdd.map(x => Vectors.dense(x.toSeq.map(_.toString.toDouble).toArray))

val clusters  = KMeans.train(rddVec,7, 10,5,”random”,1234L)

clusters.save(spark.sparkContext,args(0))
val WSSSE = clusters.computeCost(rddVec)
println(“Within Set Sum of Squared Errors = ” + WSSSE)
Plot the SSE value in the graph to figure out where are we getting the elbow. In the above example 7 is the cluster size, 10 is the maximum iteration, 5 is the no of run, “random” is the mode and 1234L is the seed value.

There is one another method to get the right number of cluster (This method should be used only in case when we are developing an anomaly system since the cluster obtained by this will not give the actual no of types of events within the data). In this method, we calculate the distance of each element from the center of the cluster and take the mean of all the distances. The no of cluster that has the minimum mean is taken as the cluster size. To reduce the no of outliers or anomaly detected, but it increases the cluster size.

Below is the code to get the cluster size with our second method.

def distance(a:Vector,b:Vector):Double = {
math.sqrt(a.toArray.zip(b.toArray).map(p=>p._1-p._2).map(d => d * d).sum)
}

def distToCentroid(datum:Vector,model:KMeansModel):Double = {
val cluster = model.predict(datum)
val centrodi = model.clusterCenters(cluster)

val d =  distance(centrodi,datum)

d

}
def clusteringScore(data:RDD[Vector],k:Int):Double= {
val kmeans = new KMeans
kmeans.setK(k)
val model = kmeans.run(data)
val dd:RDD[(Vector,Double)] = data.map(datum => (datum,distToCentroid(datum,model)))
import spark.sqlContext.implicits._
//dd.toDF().show()

val tlist = dd.top(5)(Ordering.by(_._2))

//println(dd.mean())
dd.values.mean()
}

val temp = (1 to 100 by 1).map(k => (k,clusteringScore(rddVec,k)))
temp.foreach(println)

The Second step is to get the threshold – which is the distance from the centroid at which we identify it as anomaly. We can calculate distance of all record and make a histogram of our own choice (depending on the data size) which will help us spread the data as much as we can. From which we can identify the groups and pick a threshold which will tell us from where the anomaly starts.

Code for Second step is given below.

 

val data_distance1:RDD[Double] = rddVec.map(distToCentroid(_,clusters))
  val his = data_distance.keys.histogram(20)

println(his._1.toList+””+his._2.toList)
The parameter clusters is the model created with the no of k identified in the above step.

We then arrive at that threshold and use this same model if we are running a batch system or give this threshold and configuration in streaming k means if we are running a streaming system.