您的位置:首页 > 博客中心 > 数据库 >

SparkSQL(一)

时间:2022-03-15 15:52

一、概述

 

组件

技术图片

 

 

技术图片

 

 

 

运行机制

技术图片

 

 

转 SparkSQL – 从0到1认识Catalyst  

 

 

 

更高效

技术图片

 

 

 

 

 

查询优化

技术图片

技术图片

 

 

 优化:把filter提前

 

 数据源优化

技术图片

 

 

 编译优化 Code generation

技术图片

 

 

 

技术图片

 

 

 

DataSet和DataFrame

技术图片

 

 

 

技术图片

 

 

 

 

数据源

技术图片

 

 

 Parquet文件

技术图片

 

 

 Json文件

技术图片

 

 

 读取Hive中文件

技术图片

 

 

 外部数据源spark.read.format

技术图片

 

 

 

 

二、程序设计

常规流程

技术图片

 

 

 

API:SQL与DataFrame DSL

技术图片

 

 

 

 

 

技术图片

 

 

 

 

技术图片

 

 

 

 

 

 

统计分析内容大小-全部内容大小,日志条数,最小内容大小,最大内容大小

package org.sparkcourse.log

import org.apache.spark.sql.{Row, SparkSession}

object LogAnalyzerSQL {
  def main(args: Array[String]): Unit = {
    
    val spark = SparkSession.builder()
      .appName("Log Analyzer")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    val accessLogs = spark
      .read
      .textFile("data/weblog/apache.access.log")
      .map(ApacheAccessLog.parseLogLine).toDF()

    accessLogs.createOrReplaceTempView("logs")

    // 统计分析内容大小-全部内容大小,日志条数,最小内容大小,最大内容大小
    val contentSizeStats: Row = spark.sql("SELECT SUM(contentSize), COUNT(*), MIN(contentSize), MAX(contentSize) FROM logs").first()
    val sum = contentSizeStats.getLong(0)
    val count = contentSizeStats.getLong(1)
    val min = contentSizeStats.getLong(2)
    val max = contentSizeStats.getLong(3)
    println("sum %s, count %s, min %s, max %s".format(sum, count, min, max))
    println("avg %s", sum / count)
    spark.close()


  }
}

ApacheAccessLog

package org.sparkcourse.log

import sun.security.x509.IPAddressName

case class ApacheAccessLog(ipAddress: String,
                           clientIdentd: String,
                           userId: String,
                           dateTime: String,
                           method: String,
                           endpoint: String,
                           protocol: String,
                           responseCode: Int,
                           contentSize: Long){
}

object ApacheAccessLog {
  // 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846
  val PATTERN = """^(\S+) (\S+) (\S+) \[([\w:/]+\s+\-\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r

  def parseLogLine(log: String): ApacheAccessLog = {
    log match {
      case PATTERN(ipAddress, clientIdentd, userId, dateTime, method, endpoint, protocol, responseCode, contentSize)
        => ApacheAccessLog(ipAddress, clientIdentd, userId, dateTime, method, endpoint, protocol, responseCode.toInt, contentSize.toLong)
      case _ => throw new RuntimeException(s"""Cannot parse log line: $log""")
    }
  }

 

统计每种返回码的数量

package org.sparkcourse.log

import org.apache.spark.sql.{Row, SparkSession}

object LogAnalyzerSQL {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Log Analyzer")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    val accessLogs = spark
      .read
      .textFile("data/weblog/apache.access.log")
      .map(ApacheAccessLog.parseLogLine).toDF()

    accessLogs.createOrReplaceTempView("logs")



    // 统计每种返回码的数量.
    val responseCodeToCount = spark.sql("SELECT responseCode, COUNT(*) FROM logs GROUP BY responseCode LIMIT 100")
      .map(row => (row.getInt(0), row.getLong(1)))
      .collect()
    responseCodeToCount.foreach(print(_))
  }
}

 

统计哪个IP地址访问服务器超过10次

package org.sparkcourse.log

import org.apache.spark.sql.{Row, SparkSession}

object LogAnalyzerSQL {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Log Analyzer")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    val accessLogs = spark
      .read
      .textFile("data/weblog/apache.access.log")
      .map(ApacheAccessLog.parseLogLine).toDF()

    accessLogs.createOrReplaceTempView("logs")





    // 统计哪个IP地址访问服务器超过10次
    val ipAddresses = spark.sql("SELECT ipAddress, COUNT(*) AS total FROM logs GROUP BY ipAddress HAVING total > 10 LIMIT 100")
      .map(row => row.getString(0))
      .collect()
    ipAddresses.foreach(println(_))
  }
}

 

查询访问量最大的访问目的地址

package org.sparkcourse.log

import org.apache.spark.sql.{Row, SparkSession}

object LogAnalyzerSQL {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Log Analyzer")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    val accessLogs = spark
      .read
      .textFile("data/weblog/apache.access.log")
      .map(ApacheAccessLog.parseLogLine).toDF()

    accessLogs.createOrReplaceTempView("logs")




    // 查询访问量最大的访问目的地址
    val topEndpoints = spark.sql("SELECT endpoint, COUNT(*) AS total FROM logs GROUP BY endpoint ORDER BY total DESC LIMIT 10")
      .map(row => (row.getString(0), row.getLong(1)))
      .collect()
    topEndpoints.foreach(println(_))

  }
}

 

本类排行

今日推荐

热门手游