¸üÐÂʱ¼ä:2021Äê03ÔÂ23ÈÕ14ʱ14·Ö À´Ô´:ÀÖÓãµç¾º ä¯ÀÀ´ÎÊý:

ÔÚSpark2.0°æ±¾Ö®Ç°£¬Spark SQLÖеÄSQLContextÊÇ´´½¨DataFrameºÍÖ´ÐÐSQLµÄÈë¿Ú£¬ÎÒÃÇ¿ÉÒÔÀûÓÃHiveContext½Ó¿Ú£¬Í¨¹ýHiveQLÓï¾ä²Ù×÷Hive±íÊý¾Ý£¬ÊµÏÖÊý¾Ý²éѯ¹¦ÄÜ¡£¶øÔÚSpark2.0Ö®ºó£¬SparkʹÓÃȫеÄSparkSession½Ó¿ÚÌæ´úSQLContext¼°HiveContext½Ó¿ÚÍê³ÉÊý¾ÝµÄ¼ÓÔØ¡¢×ª»»¡¢´¦ÀíµÈ¹¦ÄÜ¡£
´´½¨SparkSession¶ÔÏó¿ÉÒÔͨ¹ý“SparkSession.builder().getOrCreate()”·½·¨»ñÈ¡£¬µ«µ±ÎÒÃÇʹÓÃSpark-Shell±àд³ÌÐòʱ£¬Spark-Shell¿Í»§¶Ë»áĬÈÏÌṩÁËÒ»¸öÃûΪ“sc”µÄSparkContext¶ÔÏóºÍÒ»¸öÃûΪ“spark”µÄSparkSession¶ÔÏó£¬Òò´ËÎÒÃÇ¿ÉÒÔÖ±½ÓʹÓÃÕâÁ½¸ö¶ÔÏ󣬲»ÐèÒª×ÔÐд´½¨¡£Æô¶¯Spark-ShellÃüÁîÈçÏÂËùʾ¡£
$ spark-shell --master local[2]
ÔÚÆô¶¯Spark-ShellÍê³Éºó£¬Ð§¹ûÈçͼ1Ëùʾ¡£

ͼ1 Æô¶¯Spark-Shell
ÔÚͼ1ÖпÉÒÔ¿´³ö£¬SparkContext¡¢SparkSession¶ÔÏóÒÑ´´½¨Íê³É¡£´´½¨DataFrameÓжàÖÖ·½Ê½£¬×î»ù±¾µÄ·½Ê½ÊÇ´ÓÒ»¸öÒѾ´æÔÚµÄRDDµ÷ÓÃtoDF()·½·¨½øÐÐת»»µÃµ½DataFrame£¬»òÕßͨ¹ýSpark¶ÁÈ¡Êý¾ÝÔ´Ö±½Ó´´½¨¡£
ÔÚ´´½¨DataFrame֮ǰ£¬ÎªÁËÖ§³ÖRDDת»»³ÉDataFrame¼°ºóÐøµÄSQL²Ù×÷£¬ÐèÒªµ¼Èëspark.implicits._°üÆôÓÃÒþʽת»»¡£ÈôʹÓÃSparkSession·½Ê½´´½¨DataFrame£¬¿ÉÒÔʹÓÃspark.read²Ù×÷£¬´Ó²»Í¬ÀàÐ͵ÄÎļþÖмÓÔØÊý¾Ý´´½¨DataFrame£¬¾ßÌå²Ù×÷APIÈç±í1Ëùʾ¡£
±í1 spark.read²Ù×÷
| ´úÂëʾÀý | ÃèÊö |
|---|---|
| spark.read.text("people.txt") | ¶ÁÈ¡txt¸ñʽµÄÎı¾Îļþ£¬´´½¨DataFrame |
| spark.read.csv ("people.csv") | ¶ÁÈ¡csv¸ñʽµÄÎı¾Îļþ£¬´´½¨DataFrame |
| spark.read.json("people.json") | ¶ÁÈ¡json¸ñʽµÄÎı¾Îļþ£¬´´½¨DataFrame |
| spark.read.parquet("people.parquet") | ¶ÁÈ¡parquet¸ñʽµÄÎı¾Îļþ£¬´´½¨DataFrame |
1£®Êý¾Ý×¼±¸
ÔÚHDFSÎļþϵͳÖеÄ/sparkĿ¼ÖÐÓÐÒ»¸öperson.txtÎļþ£¬ÄÚÈÝÈçÎļþ1Ëùʾ¡£
Îļþ1 person.txt
zhangsan 20 lisi 29 wangwu 25 zhaoliu 30 tianqi 35 jerry 40
2£®Í¨¹ýÎļþÖ±½Ó´´½¨DataFrame
ÎÒÃÇͨ¹ýSpark¶ÁÈ¡Êý¾ÝÔ´µÄ·½Ê½½øÐд´½¨DataFrame£¬ÔÚSpark-ShellÊäÈëÏÂÁдúÂ룺
scala > val personDF = spark.read.text("/spark/person.txt")
personDF: org.apache.spark.sql.DataFrame = [value: String]
scala > personDF.printSchema()
root
|-- value: String (Nullable = true)
´ÓÉÏÊö·µ»Ø½á¹ûpersonDFµÄÊôÐÔ¿ÉÒÔ¿´³ö£¬´´½¨DataFrame¶ÔÏóÍê³É£¬Ö®ºóµ÷ÓÃDataFrameµÄprintSchema()·½·¨¿ÉÒÔ´òÓ¡µ±Ç°¶ÔÏóµÄSchemaÔªÊý¾ÝÐÅÏ¢¡£´Ó·µ»Ø½á¹û¿ÉÒÔ¿´³ö£¬µ±Ç°value×Ö¶ÎÊÇStringÊý¾ÝÀàÐÍ£¬²¢ÇÒ»¹¿ÉÒÔΪNull¡£
ʹÓÃDataFrameµÄshow()·½·¨¿ÉÒԲ鿴µ±Ç°DataFrameµÄ½á¹ûÊý¾Ý£¬¾ßÌå´úÂëºÍ·µ»Ø½á¹ûÈçÏÂËùʾ¡£
scala > personDF.show() +-------------+ | value | +-------------+ |1 zhangsan 20| |2 lisi 29| |3 wangwu 25| |4 zhaoliu 30| |5 tianqi 35| |6 jerry 40| +-------------+
´ÓÉÏÊö·µ»Ø½á¹û¿´³ö£¬µ±Ç°personDF¶ÔÏóÖеÄ6Ìõ¼Ç¼¾Í¶ÔÓ¦ÁËperson.txtÎı¾ÎļþÖеÄÊý¾Ý¡£
3£®RDDת»»DataFrame
µ÷ÓÃRDDµÄtoDF()·½·¨£¬¿ÉÒÔ½«RDDת»»ÎªDataFrame¶ÔÏ󣬾ßÌå´úÂëÈçÏÂËùʾ¡£
scala > val lineRDD = sc.textFile("/spark/person.txt").map(_.split(" "))
lineRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at
map at <console>:24
scala > case class Person(id:Int,name:String,age:Int)
defined class Person
scala > val personRDD =
lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
personRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[7] at map
at <console>:27
scala > val personDF = personRDD.toDF()
personDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more
field]
scala > personDF.show
+----+--------+----+
| id | name | age|
+----+--------+----+
| 1 |zhangsan | 20|
| 2 |lisi | 29|
| 3 |wangwu | 25|
| 4 |zhaoliu | 30|
| 5 |tianqi | 35|
| 6 |jerry | 40|
+----+--------+----+
scala > personDF.printSchema
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
ÔÚÉÏÊö´úÂëÖУ¬µÚ1ÐдúÂ뽫Îı¾Îļþת»»³ÉRDD£¬µÚ4ÐдúÂ붨ÒåPersonÑùÀýÀ࣬Ï൱ÓÚ¶¨Òå±íµÄSchemaÔªÊý¾ÝÐÅÏ¢£¬µÚ6ÐдúÂë±íʾʹRDDÖеÄÊý×éÊý¾ÝÓëÑùÀýÀà½øÐйØÁª£¬×îÖջὫRDD[Array[String]]¸ü¸ÄΪRDD[Person]£¬µÚ9ÐдúÂë±íʾµ÷ÓÃRDDµÄtoDF()·½·¨£¬¾Í¿ÉÒÔ°ÑRDDת»»³ÉÁËDataFrameÁË¡£µÚ12-27ÐдúÂë±íʾµ÷ÓÃDataFrame·½·¨²¢´Ó·µ»Ø½á¹û¿ÉÒÔ¿´³ö£¬RDD¶ÔÏó³É¹¦×ª»»DataFrame¡£
²ÂÄãϲ»¶£º
Redis¡¢´«Í³Êý¾Ý¿â¡¢HBaseÒÔ¼°HiveµÄÇø±ð
ÔõÑù°²×°ºÍÅäÖÃSqoop£¿
DataFrameÊÇʲôÒâ˼?ÓëRDDÏà±ÈÓÐÄÄЩÓŵ㣿
´óÊý¾ÝHadoopÉú̬Ȧ°üº¬ÄÄЩ×Óϵͳ£¿
ÀÖÓãµç¾º´óÊý¾ÝÏîÄ¿¿ª·¢Åàѵ
±±¾©Ð£Çø