当前位置: 首页 > news >正文

网页设计图片锚点链接怎么做htmlseo关键词优化排名推广

网页设计图片锚点链接怎么做html,seo关键词优化排名推广,深圳东门属于哪个区,网站推广计划书怎么写文章目录 源码解析Flink源节点数据读取是如何与checkpoint串行执行Checkpoint阶段StreamTask类变量actionExecutor的实现和初始化小结 数据读取阶段小结 总结 源码解析Flink源节点数据读取是如何与checkpoint串行执行 Flink版本:1.13.6 前置知识:源节点…

文章目录

        • 源码解析Flink源节点数据读取是如何与checkpoint串行执行
          • Checkpoint阶段
            • StreamTask类变量actionExecutor的实现和初始化
            • 小结
          • 数据读取阶段
            • 小结
          • 总结

源码解析Flink源节点数据读取是如何与checkpoint串行执行

Flink版本:1.13.6

前置知识:源节点的Checkpoint是由Checkpointcoordinate触发,具体是通过RPC调用TaskManager中对应的Task的StreamTask类的performChecpoint方法执行Checkpoint。

本文思路:本文先分析checkpoint阶段,然后再分析数据读取阶段,最后得出结论:源节点Checkpoint时和源节点读取数据时,都需要抢SourceStreamTask类中lock变量的锁,最终实现串行执行checkpoint与写数据

Checkpoint阶段

Checkpoint在StreamTask的performCheckpoint方法中执行,该方法调用过程如下

// 在StreamTask类中 执行checkpoint操作
private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetricsBuilder checkpointMetrics )throws Exception {if (isRunning) {//使用actionExecutor 同步触发checkpointactionExecutor.runThrowing(() -> {....//经过一系列检查subtaskCheckpointCoordinator.checkpointState(checkpointMetaData,checkpointOptions,checkpointMetrics,operatorChain,this::isRunning);});return true;} else {....}}

从上述代码可以看出,Checkpoint执行是由actionExecutor执行器执行

StreamTask类变量actionExecutor的实现和初始化

StreamTask类变量actionExecution的实现

通过代码注释可以知道该执行器的实现是StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor;从SynchronizedStreamTaskActionExecutor源代码可知,该执行器每次执行都需要获得mutex对象锁

  /*** All actions outside of the task {@link #mailboxProcessor mailbox} (i.e. performed by another* thread) must be executed through this executor to ensure that we don't have concurrent method* calls that void consistent checkpoints.** <p>CheckpointLock is superseded by {@link MailboxExecutor}, with {@link* StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor* SynchronizedStreamTaskActionExecutor} to provide lock to {@link SourceStreamTask}.*/
private final StreamTaskActionExecutor actionExecutor;class SynchronizedStreamTaskActionExecutor implements StreamTaskActionExecutor {private final Object mutex;public SynchronizedStreamTaskActionExecutor(Object mutex) {this.mutex = mutex;}@Overridepublic void run(RunnableWithException runnable) throws Exception {synchronized (mutex) {runnable.run();}}
}

StreamTask变量actionExecution初始化

actionExecutor变量在StreamTask中定义,在构造方法中初始化;该构造方法由SourceStreamTask调用,并传入SynchronizedStreamTaskActionExecutor对象,代码如下所示

//   SourceStreamTask的方法
private SourceStreamTask(Environment env, Object lock) throws Exception {//调用的StreamTask构造函数,传入SynchronizedStreamTaskActionExecutor对象super(env,null,FatalExitExceptionHandler.INSTANCE,//初始化actionExecutorStreamTaskActionExecutor.synchronizedExecutor(lock));//将lock对象赋值给类变量lockthis.lock = Preconditions.checkNotNull(lock);this.sourceThread = new LegacySourceFunctionThread();getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);
}//  StreamTask的方法
protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,//初始化actionExecutorStreamTaskActionExecutor actionExecutor)throws Exception {this(environment,timerService,uncaughtExceptionHandler,actionExecutor,new TaskMailboxImpl(Thread.currentThread()));
}protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor actionExecutor,TaskMailbox mailbox)throws Exception {super(environment);this.configuration = new StreamConfig(getTaskConfiguration());this.recordWriter = createRecordWriterDelegate(configuration, environment);//初始化actionExecutorthis.actionExecutor = Preconditions.checkNotNull(actionExecutor);this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);.......}
小结

actionExecutor执行器每次执行都需要获得mutex对象,mutex对象就是SourceStreamTask类中的lock对象;即算子每次执行Checkpoint时都需要获得SourceStreamTask类中lock对象锁才能进行

数据读取阶段

在执行Checkpoint时控制读取源端,则控制点必定是在调用SourceContext的collect方法时

@Override
public void run(SourceContext<String> ctx) throws Exception {int i = 0;while (true) {//在这个方法里处理ctx.collect(String.valueOf(i));}
}

点击collection查看实现,选择NonTimestampContext查看代码,collect()实现如下

@Override
public void collect(T element) {synchronized (lock) {output.collect(reuse.replace(element));}
}

所以这里控制数据读取发送是通过lock来控制,lock是如何初始化的?

通过NonTimestampContext构造方法可以定位到StreamSourceContexts->getSourceContext方法;

public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic,ProcessingTimeService processingTimeService,Object checkpointLock,StreamStatusMaintainer streamStatusMaintainer,Output<StreamRecord<OUT>> output,long watermarkInterval,long idleTimeout) {final SourceFunction.SourceContext<OUT> ctx;switch (timeCharacteristic) {....case ProcessingTime://初始化NonTimestampContextctx = new NonTimestampContext<>(checkpointLock, output);break;default:throw new IllegalArgumentException(String.valueOf(timeCharacteristic));}return ctx;
}

向上追踪,在StreamSource类中调用getSourceContext:

public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final Output<StreamRecord<OUT>> collector,final OperatorChain<?, ?> operatorChain)throws Exception {....this.ctx =StreamSourceContexts.getSourceContext(timeCharacteristic,getProcessingTimeService(),lockingObject,streamStatusMaintainer,collector,watermarkInterval,-1);....}
// 再向上最终run方法的调用点->是由内部方法run调用
public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final OperatorChain<?, ?> operatorChain)throws Exception {run(lockingObject, streamStatusMaintainer, output, operatorChain);
}//再向上最终run方法的调用点->SourceStreamTask 调用run 然后再代用mainOpterator run方法
@Override
public void run() {try {// 使用的是类变量lockmainOperator.run(lock, getStreamStatusMaintainer(), operatorChain);if (!wasStoppedExternally && !isCanceled()) {synchronized (lock) {operatorChain.setIgnoreEndOfInput(false);}}completionFuture.complete(null);} catch (Throwable t) {// Note, t can be also an InterruptedExceptioncompletionFuture.completeExceptionally(t);}
}
小结

所以在源端写数据时,必须获得SourceStreamTask中的类变量lock的锁才能进行写数据;类变量lock刚好和执行器时同一个对象

总结

flink的source算子在Checkpoint时,是通过锁对象SourceStreamTask.lock,来控制源端数据产生和Checkpoint的有序进行


文章转载自:
http://ranular.gcqs.cn
http://olivary.gcqs.cn
http://nounou.gcqs.cn
http://azure.gcqs.cn
http://safeblowing.gcqs.cn
http://diffused.gcqs.cn
http://lash.gcqs.cn
http://succussive.gcqs.cn
http://bayeux.gcqs.cn
http://aortic.gcqs.cn
http://rehydration.gcqs.cn
http://hemorrhoidal.gcqs.cn
http://regretful.gcqs.cn
http://aiff.gcqs.cn
http://bumpkin.gcqs.cn
http://ringbark.gcqs.cn
http://enrolment.gcqs.cn
http://blimey.gcqs.cn
http://mscp.gcqs.cn
http://univallate.gcqs.cn
http://dermestid.gcqs.cn
http://shortsighted.gcqs.cn
http://cacm.gcqs.cn
http://stratovolcano.gcqs.cn
http://gully.gcqs.cn
http://nihilism.gcqs.cn
http://palynomorph.gcqs.cn
http://hendecasyllabic.gcqs.cn
http://cancelation.gcqs.cn
http://helper.gcqs.cn
http://grimily.gcqs.cn
http://complexity.gcqs.cn
http://balkan.gcqs.cn
http://refinance.gcqs.cn
http://polycotyl.gcqs.cn
http://carpetbagger.gcqs.cn
http://unintelligible.gcqs.cn
http://avidin.gcqs.cn
http://upcoil.gcqs.cn
http://rimption.gcqs.cn
http://agronomist.gcqs.cn
http://insoul.gcqs.cn
http://unengaging.gcqs.cn
http://banquette.gcqs.cn
http://crapola.gcqs.cn
http://aldermaston.gcqs.cn
http://excrement.gcqs.cn
http://tonoscope.gcqs.cn
http://bumbledom.gcqs.cn
http://alike.gcqs.cn
http://neglected.gcqs.cn
http://plumule.gcqs.cn
http://rrb.gcqs.cn
http://phytotoxicity.gcqs.cn
http://degear.gcqs.cn
http://aleyard.gcqs.cn
http://wildebeest.gcqs.cn
http://quiesce.gcqs.cn
http://schoolfellow.gcqs.cn
http://evanescent.gcqs.cn
http://ingush.gcqs.cn
http://preservable.gcqs.cn
http://pogrom.gcqs.cn
http://avowable.gcqs.cn
http://swordfish.gcqs.cn
http://feoffment.gcqs.cn
http://eek.gcqs.cn
http://sodar.gcqs.cn
http://wog.gcqs.cn
http://intarsiate.gcqs.cn
http://kielbasa.gcqs.cn
http://runelike.gcqs.cn
http://bogey.gcqs.cn
http://sultrily.gcqs.cn
http://kayah.gcqs.cn
http://danthonia.gcqs.cn
http://doyenne.gcqs.cn
http://ceroplastic.gcqs.cn
http://distractingly.gcqs.cn
http://abbreviation.gcqs.cn
http://cheekpiece.gcqs.cn
http://accede.gcqs.cn
http://panchromatic.gcqs.cn
http://methylase.gcqs.cn
http://clearinghouse.gcqs.cn
http://redbridge.gcqs.cn
http://leboyer.gcqs.cn
http://imprimis.gcqs.cn
http://wellhandled.gcqs.cn
http://inguinally.gcqs.cn
http://inadmissibility.gcqs.cn
http://divulsion.gcqs.cn
http://sublimize.gcqs.cn
http://burgundian.gcqs.cn
http://afar.gcqs.cn
http://slic.gcqs.cn
http://sanctifier.gcqs.cn
http://mithraic.gcqs.cn
http://cratered.gcqs.cn
http://nook.gcqs.cn
http://www.15wanjia.com/news/59360.html

相关文章:

  • 沙河企业做网站seo优化官网
  • 关于h5的网站模板汕头seo代理商
  • 金昌做网站抖音seo代理
  • 平面排版网站网页设计页面
  • 长沙优化网站获客软件郴州网络推广公司排名
  • 福州住房和建设局网站网站设计模板网站
  • 陕西做网站的公司电话南昌seo方案
  • 高端网站建设信息百度一下官方入口
  • 百度竞价网站谁做seo页面优化公司
  • 网站备案 网址下载百度极速版
  • 宜春做网站哪里好短链接在线生成器
  • 济南做网站维护的公司推广软文发稿
  • 武汉做网站及logo的公司seo排名平台
  • 北京有哪些网站建设公司好国外免费舆情网站有哪些软件
  • 微信公众号网站建设如何做网站设计
  • 网站建设需求说明书互联网平台推广
  • 在哪些网站能接到活做深圳经济最新新闻
  • 淄博网站建设咨询臻动传媒百度怎么收录网站
  • 小型公司怎么注册西安关键词优化服务
  • 德州企业认证网站建设苏州做网站哪家比较好
  • 做的好的阅读类的网站有哪些互联网营销推广怎么做
  • 常用外贸网站免费百度下载
  • 深圳优化网站公司哪家好怎样才能注册自己的网站
  • 做网站都需要什么工具seo内部优化具体做什么
  • 新疆建设兵团第二中学招生网站网站分析案例
  • 2016网站设计欣赏推广公司app主要做什么
  • 建那种外卖网站该怎么做廊坊网站设计
  • 天津做网站那家好网络推广的方式
  • jsp动态网站开发论文seo课程多少钱
  • 企业文化墙设计图效果图深圳百度推广排名优化