阜阳专业网站建设企业网站模板 免费
文章目录
- 零、本讲学习目标
- 一、使用Spark SQL实现词频统计
- (一)数据源 - words.txt
- (二)创建Maven项目
- (三)添加依赖和构建插件
- (四)修改源目录名称
- (五)创建日志属性文件
- (六)创建词频统计单例对象
- (七)启动程序,查看结果
- (八)词频统计数据转化流程图
零、本讲学习目标
- 使用Spark SQL实现词频统计
- 使用Spark SQL计算总分与平均分
- 使用Spark SQL统计每日新增用户
- 使用Spark SQL实现分组排行榜
- 使用Spark SQL进行智慧交通数据分析
一、使用Spark SQL实现词频统计
(一)数据源 - words.txt
(二)创建Maven项目
- 创建Maven项目 -
SparkSQLWordCount
(三)添加依赖和构建插件
- 在
pom.xml
文件里添加依赖和构建插件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>net.hw.wc</groupId><artifactId>SparkSQLWordCount</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.1.1</version></dependency></dependencies><build> <plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.3.2</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build>
</project>
(四)修改源目录名称
- 将源目录名由
java
改成scala
- 在
pom.xml
文件里,设置源目录
(五)创建日志属性文件
- 在resources目录里创建
log4j.properties
文件
log4j.rootLogger=ERROR, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spark.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
(六)创建词频统计单例对象
- 创建
net.hw.wc
包,在包里创建SparkSQLWordCount
单例对象
package net.hw.wcimport org.apache.spark.sql.{Dataset, SparkSession}/*** 功能:利用Spark SQL实现词频统计* 作者:华卫* 日期:2022年05月15日*/
object SparkSQLWordCount {def main(args: Array[String]): Unit = {// 设置HADOOP用户名属性,否则本地运行访问会被拒绝System.setProperty("HADOOP_USER_NAME", "root")// 创建或得到SparkSessionval spark = SparkSession.builder().appName("SparkSQLWordCount").master("local[*]").getOrCreate()// 读取HDFS上的单词文件val lines: Dataset[String] = spark.read.textFile("hdfs://master:9000/input/words.txt")// 显示数据集lines内容lines.show()// 导入Spark会话对象的隐式转换import spark.implicits._// 将数据集中的数据按空格切分并合并val words: Dataset[String] = lines.flatMap(_.split(" "))// 显示数据集words内容words.show()// 将数据集默认列名由value改为word,并转换成数据帧val df = words.withColumnRenamed("value", "word").toDF()// 显示数据帧内容df.show()// 基于数据帧创建临时视图df.createTempView("v_words")// 执行SQL分组查询,实现词频统计val wc = spark.sql("""| select word, count(*) as count| from v_words group by word| order by count desc|""".stripMargin)// 显示词频统计结果wc.show()// 关闭会话spark.close()}
}
(七)启动程序,查看结果
- 运行
SparkSQLWordCount
单例对象
(八)词频统计数据转化流程图
- 文本文件,转化成数据集,再转化成数据帧,最后基于表查询得到结果数据帧