Windows operation and groupBy operation

Windows operation

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

 

Windows operation is a unique operation of continuous flow. Setting the size of time window and performing groupBy operation according to the size of window.

Look at the groupBy operation on the dataset.

groupBy operation

Definition:

 def groupBy(cols: Column*): RelationalGroupedDataset = {

    RelationalGroupedDataset(toDF(), cols.map(_.expr), RelationalGroupedDataset.GroupByType)

  }

Generate a new RelationalGroupedDataset object. The most important method for this object is:

 private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {

    val aggregates = if (df.sparkSession.sessionState.conf.dataFrameRetainGroupColumns) {

      groupingExprs ++ aggExprs

    } else {

      aggExprs

    }

    val aliasedAgg = aggregates.map(alias)

    groupType match {

      case RelationalGroupedDataset.GroupByType =>

        Dataset.ofRows(df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))

      case RelationalGroupedDataset.RollupType =>

        Dataset.ofRows(

          df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))

      case RelationalGroupedDataset.CubeType =>

        Dataset.ofRows(

          df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))

      case RelationalGroupedDataset.PivotType(pivotCol, values) =>

        val aliasedGrps = groupingExprs.map(alias)

        Dataset.ofRows(

          df.sparkSession, Pivot(Some(aliasedGrps), pivotCol, values, aggExprs, df.logicalPlan))

    }

  }

Let's look at one:

Dataset.ofRows(df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))

See how it works?

Aggregate here is a kind of Logic Plan, we just need to look at the implementation mechanism of Aggregate.

The implementation mechanism of Aggregate involves the related classes in the catalyst package.

Tags: Windows Spark

Posted on Wed, 09 Oct 2019 13:27:26 -0700 by SulleyMonstersInc