乐鱼电竞

  • 教育行业A股IPO第一股(股票代码 003032)

    全国咨询/投诉热线:400-618-4000

    Spark SQL通过JDBC连接MySQL读写数据

    更新时间:2015年12月29日16时01分 来源:乐鱼播客云计算学科 浏览次数:

    Spark SQL通过JDBC连接MySQL读写数据

    Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。
    一.从MySQL中加载数据(Spark Shell方式)
    1.启动Spark Shell,必须指定mysql连接驱动jar包
    /usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-shell \
    --master spark://node1.itcast.cn:7077 \
    --jars /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \
    --driver-class-path /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar

    2.从mysql中加载数据
    val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.10.1:3306/bigdata", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "person", "user" -> "root", "password" -> "123456")).load()

    3.执行查询
    jdbcDF.show()
     
    二.将数据写入到MySQL中(打jar包方式)
    1.编写Spark SQL程序
    package cn.itcast.spark.sql

    import java.util.Properties
    import org.apache.spark.sql.{SQLContext, Row}
    import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
    import org.apache.spark.{SparkConf, SparkContext}

    object JdbcRDD {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("MySQL-Demo")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        //通过并行化创建RDD
        val personRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3", "3 kitty 6")).map(_.split(" "))
        //通过StructType直接指定每个字段的schema
        val schema = StructType(
          List(
            StructField("id", IntegerType, true),
            StructField("name", StringType, true),
            StructField("age", IntegerType, true)
          )
        )
        //将RDD映射到rowRDD
        val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
        //将schema信息应用到rowRDD上
        val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
        //创建Properties存储数据库相关属性
        val prop = new Properties()
        prop.put("user", "root")
        prop.put("password", "123456")
        //将数据追加到数据库
        personDataFrame.write.mode("append").jdbc("jdbc:mysql://192.168.10.1:3306/bigdata", "bigdata.person", prop)
        //停止SparkContext
        sc.stop()
      }
    }


    2.用maven将程序打包

    3.将Jar包提交到spark集群
    /usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \
    --class cn.itcast.spark.sql.JdbcRDD \
    --master spark://node1.itcast.cn:7077 \
    --jars /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \
    --driver-class-path /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \
    /root/spark-mvn-1.0-SNAPSHOT.jar

    0 分享到:
    和我们在线交谈!
    【网站地图】【sitemap】