How to dynamically parse Schema of JSON data from Kafka by schema_of_json method in Structured Streaming

How to parse Schema of JSON data from Kafka in Structured Streaming

In actual production, the fields in the message may change, such as adding one more field or something, but the Spark program can't stop. So consider that instead of customizing the Schema in the program, infer the Schema through the json string in the input message of Kafka. Of course, you can also update the configuration file by broadcasting variables and update Schema regularly, which is also a way of writing.

Schema infer was also used to infer kafka's json format data in Spark Streaming, as follows

dStream.map(_.value).foreachRDD(rdd=>{
  ...
  val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
  val df = spark.read.json(spark.createDataSet(rdd))
  ...
})

By parsing the json string, you can directly use the key of the json string as the Columns column name of the DataFrame.

But in Structured Streaming, the DataFrame is generated directly, which is not feasible. Turning down the api finds a way to infer Schema from JSON strings -- schema_of_json

/**
 * Parses a JSON string and infers its schema in DDL format.
 *
 * @param json a JSON string.
 *
 * @group collection_funcs
 * @since 2.4.0
 */
def schema_of_json(json: String): Column = schema_of_json(lit(json))

/**
 * Parses a JSON string and infers its schema in DDL format.
 *
 * @param json a string literal containing a JSON string.
 *
 * @group collection_funcs
 * @since 2.4.0
 */
def schema_of_json(json: Column): Column = withExpr(new SchemaOfJson(json.expr))

Absolutely first-hand information. I Google used this method for a while. Apart from two posts on Stack Overflow, the method given in this post will still be wrong.

How to query JSON data column using Spark DataFrames?

Implicit schema discovery on a JSON-formatted Spark DataFrame column?

Here's a new df test

Trying to use the second method df.select(schema_of_json($"col") times error

scala> df.select(schema_of_json($"col"))
org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`col`)' due to data type mismatch: The input json should be a string literal and not null; however, got `col`.;;

To read the error message, you need to give a string parameter, so try the following.


That is to take out the value of Row in the first line for inference and find that it is possible.

The final amendments are as follows

scala> df.select(schema_of_json(df.take(1)(0).get(0).toString).alias("schema")).select(regexp_extract($"schema","struct<(.*?)>",1) as "schema").show(false)

+--------------------------------------------------------------------+
|schema                                                              |
+--------------------------------------------------------------------+
|cardno:string,cardtype:string,flag:string,times:string,userid:string|
|cardno:string,cardtype:string,flag:string,times:string,userid:string|
+--------------------------------------------------------------------+

Final Writing to Create a Schema

scala> val str = df.select(schema_of_json(df.take(1)(0).get(0).toString).alias("schema")).select(regexp_extract($"schema","struct<(.*?)>",1)).take(1)(0).getAs[String](0)
str: String = cardno:string,cardtype:string,flag:string,times:string,userid:string

scala> val columns = str.split(",").map(x=>x.split(":")).map(x=>x(0))
columns: Array[String] = Array(cardno, cardtype, flag, times, userid)

scala> var schema = (new StructType)
schema: org.apache.spark.sql.types.StructType = StructType()

scala> columns.map(x=>{schema = schema.add(x,StringType,true)})
res154: Array[Unit] = Array((), (), (), (), ())

scala> schema
res159: org.apache.spark.sql.types.StructType = StructType(StructField(cardno,StringType,true), StructField(cardtype,StringType,true), StructField(flag,StringType,true), 
StructField(times,StringType,true), StructField(userid,StringType,true))

scala> schema.simpleString
res160: String = struct<cardno:string,cardtype:string,flag:string,times:string,userid:string>

You can use the inferred Schema to parse json

df.select(from_json($"col",schema) as "json_value").select("json_value.*")

But this is still suitable for simple Schema structures. If there is a nested structure, such as Stack Overflow, there is a problem. How to define the following Schema?

struct<abc:struct<name:string>,pqr:struct<address:string>> 

Reference resources: How to create schema (StructType) with one or more StructTypes?

import org.apache.spark.sql.types._
val name = new StructType().add($"name".string)
scala> println(name.simpleString)
struct<name:string>

val address = new StructType().add($"address".string)
scala> println(address.simpleString)
struct<address:string>

val schema = new StructType().add("abc", name).add("pqr", address)
scala> println(schema.simpleString)
struct<abc:struct<name:string>,pqr:struct<address:string>>

scala> schema.simpleString == "struct<abc:struct<name:string>,pqr:struct<address:string>>"
res4: Boolean = true

scala> schema.printTreeString
root
 |-- abc: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |-- pqr: struct (nullable = true)
 |    |-- address: string (nullable = true)

In this case, we need to write a special method to analyze it.

Next, we have the cheek to recommend a summary project. For Linux learning, big data, machine learning, data analysis, algorithm and so on, we welcome to recommend more resources: Coding-Now

Learn to record some notes, as well as read some e-books eBooks, video resources and the usual acceptance of some blogs, websites, tools that they think are better.
https://github.com/josonle/BigData-Learning

Tags: JSON Scala Spark Apache

Posted on Thu, 08 Aug 2019 23:52:18 -0700 by newbienewbie