当前位置: 首页 > news >正文

福州网站建设推进微信seo是什么意思

福州网站建设推进,微信seo是什么意思,wordpress侧栏文章,市桥有经验的网站建设目录 一.引言 二.源码浅析 1.RDD.saveAsTextFile 2.TextOutputFormat 3.FileOutputFormat 三.源码修改 1.修改文件生成逻辑 - getRecordWriter 2.允许目录存在 - checkoutputSpecs 3.全部代码 - TextOutputFormatV2 四.追加存储代码实战 五.总结 一.引言 Output d…

目录

一.引言

二.源码浅析

1.RDD.saveAsTextFile

2.TextOutputFormat 

3.FileOutputFormat

三.源码修改

1.修改文件生成逻辑 - getRecordWriter

2.允许目录存在 - checkoutputSpecs

3.全部代码 - TextOutputFormatV2

四.追加存储代码实战

五.总结


一.引言

Output directory file XXX already exists 目标目录已经存在,这个报错写 Spark 的同学都不会陌生,它不允许我们在同一个目录持续增加文件存储。在使用 Flink 文件流场景中,我们有向 HDFS 目录追加文件的需求,所以下面我们尝试继承 FileOutputFormat 实现自定义文件追加。

二.源码浅析

自定义实现之前,我们需要明确一下追加文件的两个主要问题:

- 允许目录存在

即避免 Output directory file XXX already exists 的报错

- 避免 File 重复

由于是追加文件,这里我们要避免文件名相同导致追加失败

1.RDD.saveAsTextFile

这个是我们日常使用的文本落地 API:

这里将原始的 RDD[String] 转化为 (NullWritable.get(), text) 的 pairRDD 并调用后续的 saveAsHadoopFile 并传入 TextOutputFormat:

- NullWritable

在Hadoop1.+中是 "Comparable",因此编译器无法找到隐式为其排序,并将使用默认的 "null"。然而,它是一个 `Comparable[NullWritable]` 在 Hadoop2.+ 中,编译器将调用隐式  "Ordering.ordered” 方法来创建为 "NullWritable" 排序。这就是为什么编译器会生成不同的匿名Hadoop1.+和Hadoop2.+中“saveAsTextFile”的类。因此,在这里我们提供了一个显式排序“null”,以确保编译器生成 "saveAsTextFile" 的字节码相同。

这里翻译自官方 API,简言之,我们后续自定义 OutputFormat 时需要将 RDD[String] 转换为 PairRDD 并将 key 置为 NullWritable,否则这里无法调用 saveAsHadoopFile:

2.TextOutputFormat 

前者继承了后者,我们先看下前者复写了哪些函数:

非常简洁,大部分方法都使用父类的实现,这里 getRecordWriter 的具体实现对应我们上面提到的第二个问题即避免文件重复,因为其负责根据 name 轮训生成落地的 Path 地址,我们修改这个函数即可避免追加时文件重复。

3.FileOutputFormat

TextOutputFormat Extends FileOutputFormat,上面 TextOutputFormat 只实现了 Path 相关的工作,所以需要继续到父类 FileOutputFormat 寻找抛出异常的语句:

在文件中搜索 already exists 即可定位到当前函数,是不是很熟悉,因此针对第一个问题避免文件的报错就要修改这里了,最简单的我们把这三行注释掉即可。

三.源码修改

经过上面的源码三部曲,我们如何修改 TextOutputFormat 思路也很清晰了,修改文件生成逻辑、取消抛出异常即可,下面看一下代码实现:

1.修改文件生成逻辑 - getRecordWriter

这里我们在源码中增加了 updateFileName 函数,该函数由用户自己定义输出文件名,常规的我们可以按照 part-00000、part-00001 的顺序继续存储下去,当然如果为了区分追加文件的添加时间与类型,我们也可以给其打上时间戳和自定义标记,都不想用的话也可以直接用 UUID 代替:

import java.util.UUIDUUID.randomUUID().toString

下面看一下 updateFileName 的实现:

这里初始化变量 fileName 为 "",更新时对其加锁并基于当前 name 进行判断,如果当前 fileName 为 "",则代表是第一次保存,因此默认使用 name 的 part-00000,后续再多次存储是,我们就可以获取 fileName 的后缀进行累加输出了,这里使用 DecimalFormat 实现了自动补 0 的操作。

  static String fileName = "";DecimalFormat decimalFormat = new DecimalFormat("00000");public void updateFileName(String name) {synchronized (fileName) {if (fileName.equals("")) {fileName = name;} else {fileName = "part-" + decimalFormat.format(Integer.parseInt(fileName.split("-")[1]) + 1);}}}

2.允许目录存在 - checkoutputSpecs

把 throw Exception 的异常去掉就好了,这里保留了 Println 提示目录已经存在并开始追加。

3.全部代码 - TextOutputFormatV2

换个 TextOutputFormatV2 实现我们追加文件存储的目的。

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.text.DecimalFormat;import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.*;/*** An {@link OutputFormat} that writes plain text files.*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextOutputFormatV2<K, V> extends FileOutputFormat<K, V> {static String fileName = "";DecimalFormat decimalFormat = new DecimalFormat("00000");public void updateFileName(String name) {synchronized (fileName) {if (fileName.equals("")) {fileName = name;} else {fileName = "part-" + decimalFormat.format(Integer.parseInt(fileName.split("-")[1]) + 1);}}}protected static class LineRecordWriter<K, V>implements RecordWriter<K, V> {private static final String utf8 = "UTF-8";private static final byte[] newline;static {try {newline = "\n".getBytes(utf8);} catch (UnsupportedEncodingException uee) {throw new IllegalArgumentException("can't find " + utf8 + " encoding");}}protected DataOutputStream out;private final byte[] keyValueSeparator;public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {this.out = out;try {this.keyValueSeparator = keyValueSeparator.getBytes(utf8);} catch (UnsupportedEncodingException uee) {throw new IllegalArgumentException("can't find " + utf8 + " encoding");}}public LineRecordWriter(DataOutputStream out) {this(out, "\t");}/*** Write the object to the byte stream, handling Text as a special* case.* @param o the object to print* @throws IOException if the write throws, we pass it on*/private void writeObject(Object o) throws IOException {if (o instanceof Text) {Text to = (Text) o;out.write(to.getBytes(), 0, to.getLength());} else {out.write(o.toString().getBytes(utf8));}}public synchronized void write(K key, V value)throws IOException {boolean nullKey = key == null || key instanceof NullWritable;boolean nullValue = value == null || value instanceof NullWritable;if (nullKey && nullValue) {return;}if (!nullKey) {writeObject(key);}if (!(nullKey || nullValue)) {out.write(keyValueSeparator);}if (!nullValue) {writeObject(value);}out.write(newline);}public synchronized void close(Reporter reporter) throws IOException {out.close();}}@Overridepublic void checkOutputSpecs(FileSystem ignored, JobConf job)throws FileAlreadyExistsException,InvalidJobConfException, IOException {// Ensure that the output directory is set and not already therePath outDir = getOutputPath(job);if (outDir == null && job.getNumReduceTasks() != 0) {throw new InvalidJobConfException("Output directory not set in JobConf.");}if (outDir != null) {FileSystem fs = outDir.getFileSystem(job);// normalize the output directoryoutDir = fs.makeQualified(outDir);setOutputPath(job, outDir);// get delegation token for the outDir's file systemTokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] {outDir}, job);// check its existenceif (fs.exists(outDir)) {System.out.println("Output directory " + outDir + " already exists, Start Append!");}}}public RecordWriter<K, V> getRecordWriter(FileSystem ignored,JobConf job,String name,Progressable progress)throws IOException {boolean isCompressed = getCompressOutput(job);String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator","\t");updateFileName(name);if (!isCompressed) {Path file = FileOutputFormat.getTaskOutputPath(job, fileName);FileSystem fs = file.getFileSystem(job);FSDataOutputStream fileOut = fs.create(file, progress);return new com.CommonTool.TextOutputFormatV2.LineRecordWriter<K, V>(fileOut, keyValueSeparator);} else {Class<? extends CompressionCodec> codecClass =getOutputCompressorClass(job, GzipCodec.class);// create the named codecCompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);// build the filename including the extensionPath file =FileOutputFormat.getTaskOutputPath(job,fileName + codec.getDefaultExtension());FileSystem fs = file.getFileSystem(job);FSDataOutputStream fileOut = fs.create(file, progress);return new com.CommonTool.TextOutputFormatV2.LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);}}
}

四.追加存储代码实战

  def main(args: Array[String]): Unit = {val (argsList, argsMap) = ArgsParseUtil.parseArgs(args)val conf = (new SparkConf).setAppName("AppendFileToHdfs").setMaster("local[*]")val spark = SparkSession.builder.config(conf).getOrCreate()val sc = spark.sparkContextval output = argsMap.getOrElse("output", "./append_output")val data = sc.parallelize(0 to 1000).mapPartitions { iter =>val text = new Text()iter.map { x =>text.set(x.toString)(NullWritable.get(), text)}}(0 until 10).foreach(epoch => {// 存储一次无法继续存储// data.saveAsTextFile(output)data.saveAsHadoopFile(output, classOf[NullWritable], classOf[Text], classOf[TextOutputFormatV2[NullWritable, String]])})}

首先转化为 (NullWritable.get(), text) 的 pairRDD,随后调用 saveAsHadoopFile 方法并传入我们自定义的 TextOutputFormatV2 即可,由于我们 for 循环了 10 次,所以打印了 10 个相关日志。

下面再看下生成的文件:

每个文件存 250 个数字,每次存储 4 个 part,10 次 40 个且保持递增顺序。

五.总结

想要结合源码进行修改时,结合自己的需求,带着问题去找对应的函数再复写就 OK 了,由于这里逻辑比较简单所以我们也没踩太多坑。上面采用的是单次 Job 连续存储,所以 Format 里 FileName 能够达到累加的情况,如果是多个 Job 重复启动,则每次获取的都是 part-00000,这时如果还想要保持文件名递增的话可以使用 FileSystem.listStatus 遍历文件夹获取 modifyTime 最新的文件并取其 name 即可拿到最新的文件名 part,此时将参数传入 TextFormat 并修改 update 逻辑即可实现多 Job 重复启动且文件名递增的需求了。当然了,使用 UUID + Date 是最省事滴。


文章转载自:
http://impropriator.xkzr.cn
http://thing.xkzr.cn
http://panellist.xkzr.cn
http://unheated.xkzr.cn
http://newshen.xkzr.cn
http://appaloosa.xkzr.cn
http://mandala.xkzr.cn
http://weightlessness.xkzr.cn
http://financial.xkzr.cn
http://pastille.xkzr.cn
http://slavism.xkzr.cn
http://batfish.xkzr.cn
http://zygosis.xkzr.cn
http://weightiness.xkzr.cn
http://flowerer.xkzr.cn
http://wheatland.xkzr.cn
http://subspecialty.xkzr.cn
http://inelegantly.xkzr.cn
http://overprotect.xkzr.cn
http://pollack.xkzr.cn
http://indestructibility.xkzr.cn
http://non.xkzr.cn
http://ranging.xkzr.cn
http://undro.xkzr.cn
http://naxos.xkzr.cn
http://alchemistical.xkzr.cn
http://magnetoscope.xkzr.cn
http://literalness.xkzr.cn
http://phenobarbital.xkzr.cn
http://nfl.xkzr.cn
http://spectroheliometer.xkzr.cn
http://brutishly.xkzr.cn
http://luckless.xkzr.cn
http://imputable.xkzr.cn
http://recycle.xkzr.cn
http://toxaemic.xkzr.cn
http://microcapsule.xkzr.cn
http://nonsoap.xkzr.cn
http://unruliness.xkzr.cn
http://discover.xkzr.cn
http://unavoidably.xkzr.cn
http://cupronickel.xkzr.cn
http://accept.xkzr.cn
http://monohybrid.xkzr.cn
http://chemoreceptive.xkzr.cn
http://wiener.xkzr.cn
http://committee.xkzr.cn
http://crikey.xkzr.cn
http://cerigo.xkzr.cn
http://paginate.xkzr.cn
http://catenate.xkzr.cn
http://zarape.xkzr.cn
http://metasomatosis.xkzr.cn
http://lignitoid.xkzr.cn
http://superciliousness.xkzr.cn
http://usable.xkzr.cn
http://pickerel.xkzr.cn
http://subchief.xkzr.cn
http://uncensored.xkzr.cn
http://mischievously.xkzr.cn
http://greedily.xkzr.cn
http://quandary.xkzr.cn
http://galibi.xkzr.cn
http://greenboard.xkzr.cn
http://hgv.xkzr.cn
http://hmbs.xkzr.cn
http://enforce.xkzr.cn
http://embryotroph.xkzr.cn
http://diskdupe.xkzr.cn
http://financing.xkzr.cn
http://helienise.xkzr.cn
http://malabar.xkzr.cn
http://noncontent.xkzr.cn
http://apogean.xkzr.cn
http://archive.xkzr.cn
http://grasshopper.xkzr.cn
http://javaite.xkzr.cn
http://hektostere.xkzr.cn
http://fumarole.xkzr.cn
http://tokyo.xkzr.cn
http://benthon.xkzr.cn
http://linenfold.xkzr.cn
http://metanalysis.xkzr.cn
http://ingress.xkzr.cn
http://thirdly.xkzr.cn
http://flammulation.xkzr.cn
http://trapezia.xkzr.cn
http://cynegetics.xkzr.cn
http://lipsalve.xkzr.cn
http://museology.xkzr.cn
http://cckw.xkzr.cn
http://nogg.xkzr.cn
http://turnoff.xkzr.cn
http://imput.xkzr.cn
http://bacterioscopy.xkzr.cn
http://liability.xkzr.cn
http://aspirin.xkzr.cn
http://infelt.xkzr.cn
http://carotenoid.xkzr.cn
http://paltry.xkzr.cn
http://www.15wanjia.com/news/105199.html

相关文章:

  • 个人做电影网站服务器放国外安全吗足球积分排行榜最新
  • 什么网站做软件任务挣钱线上销售的方法和技巧
  • 用wordpress做答题网站宁波微信推广平台哪个好
  • wordpress按修改时间排序网站怎么优化关键词快速提升排名
  • 网站建设网页开发企业qq邮箱
  • 点击网站郑州疫情最新动态
  • 写作网站制作东莞产品网络推广
  • 做电商网站哪家好秦皇岛seo排名
  • 湘西网站制作专业的seo排名优化
  • 地方网站盈利北京seo排名厂家
  • 网站建议公司西安优化排名推广
  • 新手学做网站pdf网站排名靠前方法
  • 镇江网站建设咨询深圳百度推广
  • 宿州政府网站建设关键词排名关键词快速排名
  • 男同志网站建设seo按照搜索引擎的什么对网站
  • 盐城中小企业网络推广网站seo外包价格
  • 免费申请网站域名怎么在百度上推广产品
  • 滴滴优惠券网站怎么做的西安seo排名外包
  • 公司网站做推广支出分录电脑优化软件
  • 网站如何做业务南宁百度seo公司
  • 武汉手机网站排名优化方法
  • 贵州做网站小程序开发需要多少钱
  • 全民电竞app的制作公司网站seo具体怎么做
  • 网站的pdf目录怎么做的人民网疫情最新消息
  • 企业网站推广怎么做短视频培训课程
  • 网站建设启动资金预算营销排名seo
  • 个人网站 如何做推广百度贴吧人工客服电话
  • 域名备案完了怎么做网站网络推广工作内容
  • 做爰视频免费观看网站优秀营销软文范例800字
  • 泉州大型网站建设武汉seo排名扣费