人民教育出版社,安庆天气预报,免疫球蛋白-库里疯狂三分集锦

admin 2个月前 ( 12-19 01:07 ) 0条评论
摘要: 4.2.1 HDFS Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持.另外,由于Hadoop的API有...

4.2.1 HDFS

Spark的整个生态体系与Hadoop是彻底兼容的,所以关于Hadoop所支撑的文件类型或许数据库类型,Spark也相同支撑.别的,因为Hadoop的API有新旧两个版别,所以Spark为了能够兼容Hadoop一切的版别,也供给了两套创立操作接口.关于外部存储创立操作而言,hadoopRDD和newHadoopRDD是最为笼统的两个函数接口,首要包括以下四个参数.

1)输入格局(InputFormat)公民教育出版社,安庆天气预报,免疫球蛋白-库里张狂三分集锦: 拟定数据输入的类型,如TextInputFormat等,新旧两个版别所引证的版别分别是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(梁梓靖NewInputFor公民教育出版社,安庆天气预报,免疫球蛋白-库里张狂三分集锦mat)

2)键类型: 指定[K,V]键值对中K的类型

3)值类型: 指定[K,V]键值对中V的类型

4)分区值: 指定由外部存储生成的RDD的partition数量的最小值,假如没有指定,体系会运用默认值defaultMinSplits

留意:其他创立操作的API接口都是为了便利终究的Spark程序开发者而设置的,是这两个接口的高效完成版别.例如,关于textFile而言,只要path这个指定文件途径的参数色欲后宫,其他参数在体系内部指定了默认值。

1.在Hadoop中以紧缩办法存储的数据,不需求指定解压办法就能够进行读取,因为Hadoop自身有一个解压器会依据紧缩文件的后缀揣度解压算法进行解压.

2.假如用Spark从Hadoop中读取某种类型的数据不知道怎样读岛田绅助取的时分,上网查找一个运用map-reduce的时分是怎样读取这种这种数据的,然后再将对应的读取办法改写成上面的hadoopRDD和newAPIHadoopRDD两个类就行了

4.2.2 MySQL数据库衔接

支撑经过Java JDBC拜访联系型数据库。需求经过JdbcRDD进行,示例如下:

(1)增加依靠


mysql
mysql-connector-java
5.1.27

(2)Mysql读取:

object MysqlRDD { 
def main(args: Array[String]): Unit = {
//1.创立spark第一页装备信息
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
//2.创立SparkContext
v少女映画在线al sc = new SparkContext(sparkConf)

//3.界说衔接mysql的参数
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql:脚故事//hadoop102:3306/rdd"
val userName = "root"
val pa公民教育出版社,安庆天气预报,免疫球蛋白-库里张狂三分集锦ssWd = "000000"

//创立板凳哥JdbcRDD
val rdd = new JdbcRDD(sc, () => {
Class.forName(driver)
DriverManager.getConnection(url, userName, passWd)
},
"select * from `rddtable` where `id`>=?;",
1,
10,
1,
r => (r.getInt(1), r.getString(2))
)

//打印最终成果
println(rdd.count())
rdd.foreach(println)

sc.stop()
}
}

Mysql写入:

def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
val sc = new SparkContext(spa王炫哲rkConf)
val data = sc.parallelize(List("Female", "Male","Female"))

data.foreachPartition(insert圈养小倌Data)
}

def insertData(iterator: Iterator[String]): Unit = {
Class.forName ("com.mysql.jdbc.Driver").newInstance()
val conn = java.sql.DriverManager.getConnection("jdbc:mysql://hadoop102:3306/rdd", "root", "000000")
iterator.foreach(data => {
val ps = conn.prepareStat武当三丰太极剑55式ement("insert into rddtable(name) values (?)")
ps.setString(1, data)
ps.e夫人电影xecuteUpdate()
})
}

4.2.3 HBase数据库

因为 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的完成,Spark 能够经过Hadoop输入格局拜访HBase。这个输入格局会回来键值对数据,其中键的类型为org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的类型为org衡阳保卫战电视剧全集.apache.hadoop.hbase.client.Result。

(1)增加依靠

 
org.apache.hbase
hbase-server
1.3.1



org.apache.hbase
hbase-client
1.3.1

(2)从HBase读取数据

object HBaseSpark {
def main(args: Array[String]): Unit = {
//创立spark装备信息
val sparkConf公民教育出版社,安庆天气预报,免疫球蛋白-库里张狂三分集锦: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
//创立SparkContext
val sc = new SparkContext(sparkConf)
//构建HBase装备信息
val co公民教育出版社,安庆天气预报,免疫球蛋白-库里张狂三分集锦nf: Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104")
conf.set(TableInputFormat.INPUT_TABLE, "rddtable")
//从HBase读取数据构成RDD
val hbaseRDD: RDD[(ImmutableBytesWritable, 冒牌特工队Result)] = sc.newAPIHadoopRDD(
conf,
classkorea1818Of[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
val count: Long = hbaseRDD.count()
println(count)
//对hbaseRDD进行处理
hbaseRDD.foreach {
case (_, result) =>
val key: String = Bytes.toString(result.getRow)
val name: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
val color: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("color")))
println("RowKey:" + key + ",Name:" + name + ",Color:" + color)
}
//封闭衔接
sc.stop()
}
}

3)往HBase写入

def main(args: Array[String]) {
//获取Spark装备信息并创立与spark的衔接
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseApp")
val公民教育出版社,安庆天气预报,免疫球蛋白-库里张狂三分集锦 sc = new SparkContext(sparkConf)

//创立HBaseConf
val conf = HBaseConfiguration.create()
val jobConf = new JobConf(conf)
jobConf.sdlidlietOutputFormat(classOf[TableOutputFormat])公民教育出版社,安庆天气预报,免疫球蛋白-库里张狂三分集锦
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")

//邱培龙构建Hbase表描绘器
val fruitTable = TableName.valueOf("fruit_spark")
val tableDescr = new HTableDescriptor(fruitTable)
tableDe脐交scr.addFamily(new HColumnDescriptor("info".getBytes))

//创立Hbase表
val admin = new HBaseAdmin(conf)
if (admin.tableExists(fruitTable)) {
admin.disableTable(fruitTable)
admin.deleteTable(fruitTable)
}
admin.createTable(tableDescr)

//界说往Hbase刺进数据的办法
def convert(triple: (Int, String, 水浒天行Int)) = {
val put = new Put(Bytes.toBytes(triple._1))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
愿望深渊put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
(new ImmutableBytesWritable, put)
}

//创立一个RDD
val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))

//将RDD内容写到HBase
val localData = initialRDD.map(convert)

localData.saveAsHadoopDataset(jobConf)
}

(本文为系列文章,重视作者阅览其它部分内容,总有一篇是你短缺的,技能无止境,且学且爱惜!!!)

文章版权及转载声明:

作者:admin本文地址:http://www.enkura.com/articles/4965.html发布于 2个月前 ( 12-19 01:07 )
文章转载或复制请以超链接形式并注明出处库里疯狂三分集锦