当前位置: 首页 > news >正文

取公司名字大全免费查询长沙专业seo优化推荐

取公司名字大全免费查询,长沙专业seo优化推荐,新沂做网站,网站建设论坛社区文章目录 一、使用 Java API 和 JavaRDD<Row> 在 Spark SQL 中向数据帧添加新列二、foreachPartition 遍历 Dataset三、Dataset 自定义 Partitioner四、Dataset 重分区并且获取分区数 一、使用 Java API 和 JavaRDD 在 Spark SQL 中向数据帧添加新列 在应用 mapPartition…

文章目录

      • 一、使用 Java API 和 JavaRDD<Row> 在 Spark SQL 中向数据帧添加新列
      • 二、foreachPartition 遍历 Dataset
      • 三、Dataset 自定义 Partitioner
      • 四、Dataset 重分区并且获取分区数

一、使用 Java API 和 JavaRDD 在 Spark SQL 中向数据帧添加新列

  在应用 mapPartition 函数后创建一个新的数据框:

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;public class Handler implements Serializable {public void handler(Dataset<Row> sourceData) {Dataset<Row> rowDataset = sourceData.where("rowKey = 'abcdefg_123'").selectExpr("split(rowKey, '_')[0] as id","name","time").where("name = '小强'").orderBy(functions.col("id").asc(), functions.col("time").desc());FlatMapFunction<Iterator<Row>,Row> mapPartitonstoTime = rows->{Int count = 0; // 只能在每个分区内自增,不能保证全局自增String startTime = "";String endTime = "";List<Row> mappedRows=new ArrayList<Row>();while(rows.hasNext()){count++;Row next = rows.next();String id = next.getAs("id");if (count == 2) {startTime = next.getAs("time");endTime = next.getAs("time");}Row mappedRow= RowFactory.create(next.getString(0), next.getString(1), next.getString(2), endTime, startTime);mappedRows.add(mappedRow);}return mappedRows.iterator();};JavaRDD<Row> sensorDataDoubleRDD=rowDataset.toJavaRDD().mapPartitions(mapPartitonstoTime);StructType oldSchema=rowDataset.schema();StructType newSchema =oldSchema.add("startTime",DataTypes.StringType,false).add("endTime",DataTypes.StringType,false);System.out.println("The new schema is: ");newSchema.printTreeString();System.out.println("The old schema is: ");oldSchema.printTreeString();Dataset<Row> sensorDataDoubleDF=spark.createDataFrame(sensorDataDoubleRDD, newSchema);sensorDataDoubleDF.show(100, false);}
}

打印结果:

The new schema is: 
root|-- id: string (nullable = true)|-- name: string (nullable = true)|-- time: string (nullable = true)The old schema is: 
root|-- id: string (nullable = true)|-- name: string (nullable = true)|-- time: string (nullable = true)|-- startTime: string (nullable = true)|-- endTime: string (nullable = true)+-----------+---------+----------+----------+----------+
|id         |name     |time      |startTime |endTime   |
+-----------+---------+----------+----------+----------+
|abcdefg_123|xiaoqiang|1693462023|1693462023|1693462023|
|abcdefg_321|xiaoliu  |1693462028|1693462028|1693462028|
+-----------+---------+----------+----------+----------+

参考:
java - 使用 Java API 和 JavaRDD 在 Spark SQL 中向数据帧添加新列
java.util.Arrays$ArrayList cannot be cast to java.util.Iterator

二、foreachPartition 遍历 Dataset

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;public class Handler implements Serializable {public void handler(Dataset<Row> sourceData) {JavaRDD<Row> dataRDD = rowDataset.toJavaRDD();dataRDD.foreachPartition(new VoidFunction<Iterator<Row>>() {@Overridepublic void call(Iterator<Row> rowIterator) throws Exception {while (rowIterator.hasNext()) {Row next = rowIterator.next();String id = next.getAs("id");if (id.equals("123")) {String startTime = next.getAs("time");// 其他业务逻辑}}}});// 转换为 lambda 表达式dataRDD.foreachPartition((VoidFunction<Iterator<Row>>) rowIterator -> {while (rowIterator.hasNext()) {Row next = rowIterator.next();String id = next.getAs("id");if (id.equals("123")) {String startTime = next.getAs("time");// 其他业务逻辑}}});}
}

三、Dataset 自定义 Partitioner

参考:spark 自定义 partitioner 分区 java 版

import org.apache.commons.collections.CollectionUtils;
import org.apache.spark.Partitioner;
import org.junit.Assert;import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** Created by lesly.lai on 2018/7/25.*/
public class CuxGroupPartitioner extends Partitioner {private int partitions;/*** map<key, partitionIndex>* 主要为了区分不同分区*/private Map<Object, Integer> hashCodePartitionIndexMap = new ConcurrentHashMap<>();public CuxGroupPartitioner(List<Object> groupList) {int size = groupList.size();this.partitions = size;initMap(partitions, groupList);}private void initMap(int size, List<Object> groupList) {Assert.assertTrue(CollectionUtils.isNotEmpty(groupList));for (int i=0; i<size; i++) {hashCodePartitionIndexMap.put(groupList.get(i), i);}}@Overridepublic int numPartitions() {return partitions;}@Overridepublic int getPartition(Object key) {return hashCodePartitionIndexMap.get(key);}public boolean equals(Object obj) {if (obj instanceof CuxGroupPartitioner) {return ((CuxGroupPartitioner) obj).partitions == partitions;}return false;}
}

查看分区分布情况工具类:
(1)Scala:

import org.apache.spark.sql.{Dataset, Row}/*** Created by lesly.lai on 2017/12FeeTask/25.*/
class SparkRddTaskInfo {def getTask(dataSet: Dataset[Row]) {val size = dataSet.rdd.partitions.lengthprintln(s"==> partition size: $size " )import scala.collection.Iteratorval showElements = (it: Iterator[Row]) => {val ns = it.toSeqimport org.apache.spark.TaskContextval pid = TaskContext.get.partitionIdprintln(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}")}dataSet.foreachPartition(showElements)}
}

(2)Java:

import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;public class SparkRddTaskInfo {public static void getTask(Dataset<Row> dataSet) {int size = dataSet.rdd().partitions().length;System.out.println("==> partition size:" + size);JavaRDD<Row> dataRDD = dataSet.toJavaRDD();dataRDD.foreachPartition((VoidFunction<Iterator<Row>>) rowIterator -> {List<String> mappedRows = new ArrayList<String>();int count = 0;while (rowIterator.hasNext()) {Row next = rowIterator.next();String id = next.getAs("id");String partitionKey = next.getAs("partition_key");String name = next.getAs("name");mappedRows.add(id + "/" + partitionKey+ "/" + name);}int pid = TaskContext.get().partitionId();System.out.println("[partition: " + pid + "][size: " + mappedRows.size() + "]" + mappedRows);});}
}

调用方式:

import com.vip.spark.db.ConnectionInfos;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;import java.util.List;
import java.util.stream.Collectors;/*** Created by lesly.lai on 2018/7/23.*/
public class SparkSimpleTestPartition {public static void main(String[] args) throws InterruptedException {SparkSession sparkSession = SparkSession.builder().appName("Java Spark SQL basic example").getOrCreate();// 原始数据集Dataset<Row> originSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people", ConnectionInfos.getTestUserAndPasswordProperties());originSet.selectExpr("split(rowKey, '_')[0] as id","concat(split(rowKey, '_')[0],'_',split(rowKey, '_')[1]) as partition_key","split(rowKey, '_')[1] as name".createOrReplaceTempView("people");// 获取分区分布情况工具类SparkRddTaskInfo taskInfo = new SparkRddTaskInfo();Dataset<Row> groupSet = sparkSession.sql(" select partition_key from people group by partition_key");List<Object> groupList = groupSet.javaRDD().collect().stream().map(row -> row.getAs("partition_key")).collect(Collectors.toList());// 创建pairRDD 目前只有pairRdd支持自定义partitioner,所以需要先转成pairRddJavaPairRDD pairRDD = originSet.javaRDD().mapToPair(row -> {return new Tuple2(row.getAs("partition_key"), row);});// 指定自定义partitionerJavaRDD javaRdd = pairRDD.partitionBy(new CuxGroupPartitioner(groupList)).map(new Function<Tuple2<String, Row>, Row>(){@Overridepublic Row call(Tuple2<String, Row> v1) throws Exception {return v1._2;}});Dataset<Row> result = sparkSession.createDataFrame(javaRdd, originSet.schema());// 打印分区分布情况taskInfo.getTask(result);}
}

四、Dataset 重分区并且获取分区数

        System.out.println("1-->"+rowDataset.rdd().partitions().length);System.out.println("1-->"+rowDataset.rdd().getNumPartitions());Dataset<Row> hehe = rowDataset.coalesce(1);System.out.println("2-->"+hehe.rdd().partitions().length);System.out.println("2-->"+hehe.rdd().getNumPartitions());

运行结果:

1-->29
1-->29
2-->2
2-->2

注意:在使用 repartition() 时两次打印的结果相同:

print(rdd.getNumPartitions())
rdd.repartition(100)
print(rdd.getNumPartitions())

产生上述问题的原因有两个:
  首先 repartition() 是惰性求值操作,需要执行一个 action 操作才可以使其执行。
  其次,repartition() 操作会返回一个新的 rdd,并且新的 rdd 的分区已经修改为新的分区数,因此必须使用返回的 rdd,否则将仍在使用旧的分区。
  修改为:rdd2 = rdd.repartition(100)

参考:repartition() is not affecting RDD partition size


文章转载自:
http://neckwear.ptzf.cn
http://neoconservative.ptzf.cn
http://cigar.ptzf.cn
http://pertinacious.ptzf.cn
http://asthmatic.ptzf.cn
http://tinctorial.ptzf.cn
http://pericarditis.ptzf.cn
http://dragonhead.ptzf.cn
http://aluminous.ptzf.cn
http://recta.ptzf.cn
http://destructional.ptzf.cn
http://polygynoecial.ptzf.cn
http://fukuoka.ptzf.cn
http://eloquent.ptzf.cn
http://peke.ptzf.cn
http://medulla.ptzf.cn
http://initiation.ptzf.cn
http://revision.ptzf.cn
http://cornbrash.ptzf.cn
http://shikaree.ptzf.cn
http://lazybed.ptzf.cn
http://obtuse.ptzf.cn
http://antiobscenity.ptzf.cn
http://pluralize.ptzf.cn
http://malinowskian.ptzf.cn
http://tedder.ptzf.cn
http://swarajist.ptzf.cn
http://melanesia.ptzf.cn
http://dnis.ptzf.cn
http://honeysuckle.ptzf.cn
http://egilops.ptzf.cn
http://peptize.ptzf.cn
http://irreality.ptzf.cn
http://cga.ptzf.cn
http://southeastwards.ptzf.cn
http://frustum.ptzf.cn
http://moralistic.ptzf.cn
http://circumcentre.ptzf.cn
http://saccharoidal.ptzf.cn
http://xylometer.ptzf.cn
http://employless.ptzf.cn
http://springy.ptzf.cn
http://coypu.ptzf.cn
http://electroplate.ptzf.cn
http://skivvy.ptzf.cn
http://datacasting.ptzf.cn
http://reject.ptzf.cn
http://ethnoarchaeology.ptzf.cn
http://protostar.ptzf.cn
http://row.ptzf.cn
http://karakule.ptzf.cn
http://lithaemic.ptzf.cn
http://perfector.ptzf.cn
http://fencible.ptzf.cn
http://tasman.ptzf.cn
http://inchoation.ptzf.cn
http://moa.ptzf.cn
http://trommel.ptzf.cn
http://ingesta.ptzf.cn
http://schoolmistress.ptzf.cn
http://jewelly.ptzf.cn
http://karass.ptzf.cn
http://monadology.ptzf.cn
http://casebearer.ptzf.cn
http://succulence.ptzf.cn
http://colluvial.ptzf.cn
http://preadult.ptzf.cn
http://digitigrade.ptzf.cn
http://humous.ptzf.cn
http://yesterevening.ptzf.cn
http://fingering.ptzf.cn
http://marcia.ptzf.cn
http://blockship.ptzf.cn
http://babette.ptzf.cn
http://powderless.ptzf.cn
http://rushlight.ptzf.cn
http://structureless.ptzf.cn
http://smithite.ptzf.cn
http://townsfolk.ptzf.cn
http://complainingly.ptzf.cn
http://switzer.ptzf.cn
http://substratum.ptzf.cn
http://labyrinthian.ptzf.cn
http://brigandine.ptzf.cn
http://inessential.ptzf.cn
http://turnout.ptzf.cn
http://adoratory.ptzf.cn
http://kiloliter.ptzf.cn
http://corrosive.ptzf.cn
http://connive.ptzf.cn
http://parabolic.ptzf.cn
http://elements.ptzf.cn
http://comparable.ptzf.cn
http://enterobactin.ptzf.cn
http://telestereoscope.ptzf.cn
http://zincum.ptzf.cn
http://trimly.ptzf.cn
http://flagellator.ptzf.cn
http://quahog.ptzf.cn
http://pentarchy.ptzf.cn
http://www.15wanjia.com/news/73376.html

相关文章:

  • 网站开发连接数据库的方法今天头条新闻
  • 莱芜新闻头条专业做seo推广
  • 成都市今天最新消息情况seoul是啥意思
  • 青岛企业网站建设宁德市旅游景点大全
  • 淘宝网站建设的目标怎么投稿各大媒体网站
  • 网站后缀co如何自建网站?
  • 学校网站策划书2022拉人头最暴利的app
  • 大连做网站优化哪家好重庆seo推广服务
  • 有平面广告设计的网站2022拉新推广平台
  • 哪个网站可以做视频片头百度网盘怎么提取别人资源
  • oss可以做网站根目录吗建立网站的基本步骤
  • 凡科做网站的模版在哪儿找抖音运营推广策划方案
  • 怎么查询网站其他域名东莞网络排名优化
  • 织梦做的网站能做seo吗成都seo培训
  • 网站服务器维护内容最近一周新闻
  • 东莞网站设计多少钱今日国际新闻热点
  • 柳城网站制作网络优化的三个方法
  • 用php做企业网站的可行性个人怎么创建网站
  • 网站主机免备案网络营销推广手段
  • 做网站开发的经营范围百度广告点击软件源码
  • 如何先做网站再绑定域名百度推广代理加盟
  • 佛教网站建设_精品推荐黄色大气极乐古寺网站源码百度新闻
  • 如何建设和优化一个网站商业网站设计
  • 做搜索网站挣钱企业网站推广的方法有
  • 网站公安备案流程seo优化检测
  • 昆明网站建设推广优化电商平台的营销方式
  • 微信公众号托管代运营哪里有seo排名优化
  • 广州h5网站制作互联网推广方式有哪些
  • 东莞seo服务商兰州网络seo公司
  • 锦州做网站公司商旅100网页版