RDD programming learning note 3 data reading and writing

Local read

scala> var textFile = sc.textFile("file:///root/1.txt")
textFile: org.apache.spark.rdd.RDD[String] = file:///root/1.txt MapPartitionsRDD[57] at textFile at <console>:24

scala> textFile.saveAsTextFile("file:///root/writeback")

scala> textFile.foreach(println)
hadoop	hello
bianhao	shan
nihao
hello	shan
hello	bianhao
	nihao
lizhao	hello

json file reading and parsing

json file reading

############ stay hdfs read json The same goes for documents ###########
scala> val jsonStr = sc.textFile("file:///usr/local/soft/spark-2.1.0-bin-without-hadoop/examples/src/main/resources/people.json")
jsonStr: org.apache.spark.rdd.RDD[String] = file:///usr/local/soft/spark-2.1.0-bin-without-hadoop/examples/src/main/resources/people.json MapPartitionsRDD[65] at textFile at <console>:24

scala> jsonStr.foreach
foreach   foreachAsync   foreachPartition   foreachPartitionAsync

scala> jsonStr.foreach(println)
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

json analysis


Let's use an example to learn
For details of sbt packaging, see Teacher Ziyu's lab course , only direct operation here (after SBT is found to be successful once in practice, as long as your scala and spark versions remain the same - that is, if the components in simple.sbt do not change, then the second time can not be connected to the Internet)

############### Establish engineering documents ###################
[root@master ~]# pwd
/root
[root@master ~]# cd code
[root@master code]# ls
scala
[root@master code]# mkdir spark
[root@master code]# cd spark/ && mkdir json && cd j*
[root@master json]# ls
[root@master json]# pwd
/root/code/spark/json
[root@master json]# mkdir -p src/main/scala && cd src/main/scala
[root@master scala]# vim testjson.scala
[root@master scala]# cat testjson.scala 
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.parsing.json.JSON

object JSONApp {
    def main(args: Array[String]) {
        val inputFile =  "file:///usr/local/soft/spark-2.1.0-bin-without-hadoop/examples/src/main/resources/people.json"
        val conf = new SparkConf().setAppName("JSONApp")
        val sc = new SparkContext(conf)
        val jsonStrs = sc.textFile(inputFile)
        val result = jsonStrs.map(s => JSON.parseFull(s))
        result.foreach( {r => r match {
                        case Some(map: Map[String, Any]) => println(map)
                        case None => println("Parsing failed")
                        case other => println("Unknown data structure: " + other)
                }
        }
        )

    }
}
[root@master scala]# cd ../../..
[root@master json]# vim simple.sbt
[root@master json]# find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/testjson.scala
[root@master json]# cat simple.sbt 
name := "JSON Project"

version := "1.0"

scalaVersion := "2.11.12"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
##################### sbt Pack ###################
[root@master json]# # Please make sure to set this directory as the current directory

[root@master json]# /usr/local/soft/mysbt2/sbt package
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256M; support was removed in 8.0
[info] Updated file /root/code/spark/json/project/build.properties: set sbt.version to 1.3.4
[info] Loading project definition from /root/code/spark/json/project
[info] Loading settings for project json from simple.sbt ...
[info] Set current project to JSON Project (in build file:/root/code/spark/json/)
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] Compiling 1 Scala source to /root/code/spark/json/target/scala-2.11/classes ...
[warn] /root/code/spark/json/src/main/scala/testjson.scala:14:40: non-variable type argument String in type pattern scala.collection.immutable.Map[String,Any] (the underlying of Map[String,Any]) is unchecked since it is eliminated by erasure
[warn]                         case Some(map: Map[String, Any]) => println(map)
[warn]                                        ^
[warn] one warning found
[success] Total time: 13 s, completed 2019-12-4 23:25:27

#################### spark-submit Submission #######################
[root@master json]# spark-submit --class "JSONApp" /root/code/spark/json/target/scala-2.11/json-project_2.11-1.0.jar 2>&1 | grep "Map("
Map(name -> Michael)
Map(name -> Andy, age -> 30.0)
Map(name -> Justin, age -> 19.0)

hdfs read

scala> var textFile = sc.textFile("hdfs://192.168.0.11:9000/user/root/1.txt")
textFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.0.11:9000/user/root/1.txt MapPartitionsRDD[62] at textFile at <console>:24

scala> textFile.first()
res25: String = hadoop	hello

scala> textFile.saveAsTextFile("write")

HBase

Is an open source implementation of Google Bigtable

data model



Timestamps are generated automatically and do not need to be managed by yourself. The latest timestamps are used for each operation. The old version is still in use, because after HDFS is written once, it can no longer be modified (Hbase is based on HDFS)

So when we use it, we only need to determine the quantity of three dimensions: row key, column family and column qualifier

A table can exist in multiple partitions (because the table is large)

HBase installation configuration

HBase learning guide for distributed database

Download speed is really slow Later, I found that I had copied the big data compression package from a teacher As a result, it is directly installed with HBase 1.1.5

Wrote my own installation tutorial
HBase1.1.x is deployed on Hadoop 2.6.0 (partial distribution of three vm virtual machines)

HBase Shell slot point

Input cannot delete backspace forward, only backspace and del can delete backward

HBase operation

You can read this article directly
Reading and writing HBase data

The example code is explained Teacher Ziyu's online class

[root@master scala-2.11]# spark-submit --driver-class-path /usr/local/soft/spark-2.1.0-bin-without-hadoop/jars/hbase/*:/usr/local/soft/hbase/conf --class "SparkOperateHBase"  ~/code/spark/hbase/target/scala-2.11/hbase-project_2.11-1.0.jar 2>&1 | grep "Row key:"
Row key:1 Name:Xueqian Gender:F Age:23
Row key:2 Name:Weiliang Gender:M Age:24
Published 21 original articles, won praise 4, visited 6515
Private letter follow

Tags: JSON Scala Spark HBase

Posted on Wed, 29 Jan 2020 05:34:55 -0800 by dizel247