How to read and write Aliyun Hbase using MaxCompute Spark

background

Spark on MaxCompute has access to instances (e.g. ECS, HBase, RDS) within the VPC in the Ali cloud. The default MaxCompute underlying network is isolated from the external network, and Spark on MaxCompute provides a solution through configurationSpark.hadoop.odps.Cupid.vpc.domain.list to access the Hbase of Ali Cloud's VPC network environment.The configuration of the standard and enhanced versions of Hbase is different. This article describes the configuration that needs to be added by simply accessing the standard and enhanced versions of AliCloud.

Hbase Standard Edition

Environmental preparation
Hbase's network environment exists under vpc, so we need to add security group open ports 2181, 10600, 16020 first. At the same time, Hbase has whitelist restrictions. We need to add corresponding MaxCompute's IP to Hbase's whitelist.
Set the security group for the corresponding vpc

Find the corresponding vpc id and add the security group settings port


Add Whitelist for Hbase


Add to hbase's whitelist

100.104.0.0/16

Create Hbase table

create 'test','cf'

Writing Spark Programs
Required Hbase dependencies

 <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-mapreduce</artifactId>
      <version>2.0.2</version>
    </dependency>
     <dependency>
      <groupId>com.aliyun.hbase</groupId>
      <artifactId>alihbase-client</artifactId>
      <version>2.0.5</version>
    </dependency>

Write code

object App {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("HbaseTest")
      .config("spark.sql.catalogImplementation", "odps")
      .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
      .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
      .getOrCreate()

    val sc = spark.sparkContext
    val config = HBaseConfiguration.create()
    val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181"
    config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
    val jobConf = new JobConf(config)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")


    try{

      import spark._
      spark.sql("select '7', 88 ").rdd.map(row => {
        val name= row(0).asInstanceOf[String]
        val id = row(1).asInstanceOf[Integer]
        val put = new Put(Bytes.toBytes(id))
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
        (new ImmutableBytesWritable, put)
      }).saveAsHadoopDataset(jobConf)
    } finally {
      sc.stop()
    }
  }
}

Submit to DataWorks
Since more than 50m is submitted through the odps client

add jar SparkHbase-1.0-SNAPSHOT -f; 

Enter Data Development New spark Node


Add Configuration
Configuration requiredSpark.hadoop.odps.Cupid.vpc.domainList
The HBase domain name here requires all the machines of hbase, and one fewer might make the network unusable

{
  "regionId":"cn-beijing",
  "vpcs":[
    {
      "vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
      "zones":[
        {
          "urls":[
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            }
          ]
        }
      ]
    }
  ]
}

 

Hbase Enhancement

Environmental preparation
Hbase Enhanced Ports are 30020, 10600, 16020. Hbase also has whitelist restrictions. We need to add the corresponding MaxCompute IP to Hbase's whitelist.
Set the security group for the corresponding vpc
Find the corresponding vpc id and add the security group settings port

Add Whitelist for Hbase

100.104.0.0/16

Create Hbase table

create 'test','cf'

Writing Spark Programs
Required Hbase dependency, referenced packages must be Ali Cloud Enhanced dependency

   <dependency>
      <groupId>com.aliyun.hbase</groupId>
      <artifactId>alihbase-client</artifactId>
      <version>2.0.8</version>
    </dependency>

Write code

object McToHbase {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("spark_sql_ddl")
      .config("spark.sql.catalogImplementation", "odps")
      .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
      .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
      .getOrCreate()

      val sc = spark.sparkContext


    try{
      spark.sql("select '7', 'long'").rdd.foreachPartition { iter =>
        val config = HBaseConfiguration.create()
        // Cluster Connection Address (VPC Intranet Address) is obtained at the database connection interface of the console page
        config.set("hbase.zookeeper.quorum", ":30020");
        import spark._
        // xml_template.comment.hbaseue.username_password.default
        config.set("hbase.client.username", "");
        config.set("hbase.client.password", "");
        val tableName = TableName.valueOf( "test")
        val conn = ConnectionFactory.createConnection(config)
        val table = conn.getTable(tableName);
        val puts = new util.ArrayList[Put]()
        iter.foreach(
          row => {
            val id = row(0).asInstanceOf[String]
            val name = row(1).asInstanceOf[String]
            val put = new Put(Bytes.toBytes(id))
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
            puts.add(put)
            table.put(puts)
          }
        )
      }
  } finally {
    sc.stop()
  }



  }
}

Be careful
hbase clinet meetingOrg.apache.spark.SparkException: Task not serializable
The reason is that spark will serialize the object to send it to other worker s
Solution

- Make classes serializable
- Only in map Passed in lambda Declare an instance in a function.
- take NotSerializable Objects are set to static objects and created once per computer.
- call rdd.forEachPartition And create it there

Serializable The object is as follows:

rdd.forEachPartition(iter-> {NotSerializable notSerializable = new NotSerializable();<br />// ...now handle iter};


Submit to DataWorks
Since more than 50m is submitted through the odps client

add jar SparkHbase-1.0-SNAPSHOT -f; 

Enter Data Development New spark Node


Add Configuration
Configuration requiredSpark.hadoop.odps.Cupid.vpc.domainList
Be careful:
1. An enhanced java api access address is required here, which must be in the form of ip.IP is obtained by ping ing the address directly, where IP is 172.16.0.10 Add Port 16000


2. The HBase domain name here requires all the machines of hbase, and one fewer may cause the network to become unreachable

{
  "regionId":"cn-beijing",
  "vpcs":[
    {
      "vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
      "zones":[
        {
          "urls":[
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
             {"domain":"172.16.0.10","port":16000}
          ]
        }
      ]
    }
  ]
}

Tags: Mobile HBase Spark VPC Hadoop

Posted on Mon, 01 Jun 2020 23:53:10 -0700 by Assorro