当前位置: 首页 > news >正文

上海h5网站建设网络优化的意义

上海h5网站建设,网络优化的意义,直播视频下载软件,网站想换域名 如何操作总览 根据kafka的3.1.0的源码example模块进行分析,如下图所示,一般实例代码就是我们分析源码的入口。 可以将produce的发送主要流程概述如下: 拦截器对发送的消息拦截处理; 获取元数据信息; 序列化处理;…

总览

根据kafka的3.1.0的源码example模块进行分析,如下图所示,一般实例代码就是我们分析源码的入口

 

可以将produce的发送主要流程概述如下:

  1. 拦截器对发送的消息拦截处理;

  2. 获取元数据信息;

  3. 序列化处理;

  4. 分区处理;

  5. 批次添加处理;

  6. 发送消息。

总的大概是上面六个步骤,下面将结合源码对每个步骤进行分析。

1. 拦截器 

消息拦截器在消息发送开始阶段进行拦截,this method does not throw exceptions注释加上代码可以看出即使拦截器抛出异常也不会中止我们的消息发送。

使用场景:发送消息的统一处理类似spring的拦截器动态切入功能,自定义拦截器打印日志、统计时间、持久化到本地数据库等。

    @Overridepublic Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptions//1.拦截器对发送的消息拦截处理;ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);return doSend(interceptedRecord, callback);}public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {ProducerRecord<K, V> interceptRecord = record;for (ProducerInterceptor<K, V> interceptor : this.interceptors) {try {interceptRecord = interceptor.onSend(interceptRecord);} catch (Exception e) {// do not propagate interceptor exception, log and continue calling other interceptors// be careful not to throw exception from hereif (record != null)log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);elselog.warn("Error executing interceptor onSend callback", e);}}return interceptRecord;}

2. 获取元数据信息

下图是发送消息主线程和发送网络请求sender线程配合获取元数据的流程:

首先找到获取kafka的元数据信息的入口,maxBlockTimeMs最大的等待时间是60s:

  try {//2.获取元数据信息;clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);} catch (KafkaException e) {if (metadata.isClosed())throw new KafkaException("Producer closed while send in progress", e);throw e;}.define(MAX_BLOCK_MS_CONFIG,Type.LONG,60 * 1000,atLeast(0),Importance.MEDIUM,MAX_BLOCK_MS_DOC)

 这里唤醒sender线程,然后阻塞等待元数据信息;

            metadata.add(topic, nowMs + elapsed);int version = metadata.requestUpdateForTopic(topic);//唤醒线程更新元数据sender.wakeup();try {//阻塞等待metadata.awaitUpdate(version, remainingWaitMs);} catch (TimeoutException ex) {// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMsthrow new TimeoutException(String.format("Topic %s not present in metadata after %d ms.",topic, maxWaitMs));}

这里可以看一下 sender线程的初始化参数:

.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) 初始化内存池的大小为32M;
.define(MAX_REQUEST_SIZE_CONFIG,Type.INT,1024 * 1024,atLeast(0),Importance.MEDIUM,MAX_REQUEST_SIZE_DOC) 默认单条消息最大为1M;

构造函数中初始化了sender,并作为守护线程在后台运行:

      this.sender = newSender(logContext, kafkaClient, this.metadata);String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;this.ioThread = new KafkaThread(ioThreadName, this.sender, true);this.ioThread.start();
   KafkaProducer(ProducerConfig config,Serializer<K> keySerializer,Serializer<V> valueSerializer,ProducerMetadata metadata,KafkaClient kafkaClient,ProducerInterceptors<K, V> interceptors,Time time) {try {...this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);int deliveryTimeoutMs = configureDeliveryTimeout(config, log);this.apiVersions = new ApiVersions();this.transactionManager = configureTransactionState(config, logContext);this.accumulator = new RecordAccumulator(logContext,config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),this.compressionType,lingerMs(config),retryBackoffMs,deliveryTimeoutMs,metrics,PRODUCER_METRIC_GROUP_NAME,time,apiVersions,transactionManager,new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));...this.errors = this.metrics.sensor("errors");this.sender = newSender(logContext, kafkaClient, this.metadata);String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;this.ioThread = new KafkaThread(ioThreadName, this.sender, true);this.ioThread.start();config.logUnused();AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());log.debug("Kafka producer started");} catch (Throwable t) {// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121close(Duration.ofMillis(0), true);// now propagate the exceptionthrow new KafkaException("Failed to construct kafka producer", t);}}
.define(ACKS_CONFIG,Type.STRING,"all",in("all", "-1", "0", "1"),Importance.LOW,ACKS_DOC) 默认所有broker同步消息才算发送成功;

 

.define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,Type.INT,5,atLeast(1),Importance.LOW,MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) 默认允许最多5个连接来发送消息;如果需要保证顺序消息需要将其设置为1.
   Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {int maxInflightRequests = configureInflightRequests(producerConfig);int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),this.metrics, time, "producer", channelBuilder, logContext),metadata,clientId,maxInflightRequests,producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),requestTimeoutMs,producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),time,true,apiVersions,throttleTimeSensor,logContext);short acks = configureAcks(producerConfig, log);return new Sender(logContext,client,metadata,this.accumulator,maxInflightRequests == 1,producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),acks,producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),metricsRegistry.senderMetrics,time,requestTimeoutMs,producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),this.transactionManager,apiVersions);}

 等待元数据的版本更新,挂起当前线程直到超时或者唤醒:

  public synchronized void awaitUpdate(final int lastVersion, final long timeoutMs) throws InterruptedException {long currentTimeMs = time.milliseconds();long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : currentTimeMs + timeoutMs;time.waitObject(this, () -> {// Throw fatal exceptions, if there are any. Recoverable topic errors will be handled by the caller.maybeThrowFatalException();return updateVersion() > lastVersion || isClosed();}, deadlineMs);if (isClosed())throw new KafkaException("Requested metadata update after close");}public void waitObject(Object obj, Supplier<Boolean> condition, long deadlineMs) throws InterruptedException {synchronized (obj) {while (true) {if (condition.get())return;long currentTimeMs = milliseconds();if (currentTimeMs >= deadlineMs)throw new TimeoutException("Condition not satisfied before deadline");obj.wait(deadlineMs - currentTimeMs);}}}

 Sende通过NetWorkClient向kafak集群拉取元数据信息:

Sendervoid runOnce() { 
client.poll(pollTimeout, currentTimeMs);
}NetWorkClient:public List<ClientResponse> poll(long timeout, long now) {ensureActive();if (!abortedSends.isEmpty()) {// If there are aborted sends because of unsupported version exceptions or disconnects,// handle them immediately without waiting for Selector#poll.List<ClientResponse> responses = new ArrayList<>();handleAbortedSends(responses);completeResponses(responses);return responses;}long metadataTimeout = metadataUpdater.maybeUpdate(now);try {this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));} catch (IOException e) {log.error("Unexpected error during I/O", e);}// process completed actionslong updatedNow = this.time.milliseconds();List<ClientResponse> responses = new ArrayList<>();handleCompletedSends(responses, updatedNow);handleCompletedReceives(responses, updatedNow);handleDisconnections(responses, updatedNow);handleConnections();handleInitiateApiVersionRequests(updatedNow);handleTimedOutConnections(responses, updatedNow);handleTimedOutRequests(responses, updatedNow);completeResponses(responses);return responses;}

如下代码 handleCompletedReceives处理返回元数据的响应,然后调用handleSuccessfulResponse处理成功的响应,最后调用ProducerMetadata更新本地元数据信息并唤醒了主线程。主线程获取到元数据后进行下面流程。

  //NetWorkClient private void handleCompletedReceives(List<ClientResponse> responses, long now) {...if (req.isInternalRequest && response instanceof MetadataResponse)metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) response);}public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) {...this.metadata.update(inProgress.requestVersion, response, inProgress.isPartialUpdate, now);}//ProducerMetadata public synchronized void update(int requestVersion, MetadataResponse response, boolean isPartialUpdate, long nowMs) {super.update(requestVersion, response, isPartialUpdate, nowMs);// Remove all topics in the response that are in the new topic set. Note that if an error was encountered for a// new topic's metadata, then any work to resolve the error will include the topic in a full metadata update.if (!newTopics.isEmpty()) {for (MetadataResponse.TopicMetadata metadata : response.topicMetadata()) {newTopics.remove(metadata.topic());}}notifyAll();}

3. 序列化处理

根据初始化的序列化器将消息的key和value进行序列化,以便后续发送网络请求:

   byte[] serializedKey;try {serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());System.out.println("serializedKey:" + Arrays.toString(serializedKey));} catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer", cce);}byte[] serializedValue;try {serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer", cce);}

4.分区处理

消息有设置key根据hash值分区,没有key采用粘性分区的方式,详情可以看下面博客Kafka生产者的粘性分区算法_张家老院子的博客-CSDN博客

 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) {if (keyBytes == null) {return stickyPartitionCache.partition(topic, cluster);}// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}

5.批次添加处理

如果第一次添加会为分区初始化一个双端队列,然后获取批次为空会

ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs)创建一个新的批次放到队列中dq.addLast(batch);

 public RecordAppendResult append(TopicPartition tp,long timestamp,byte[] key,byte[] value,Header[] headers,Callback callback,long maxTimeToBlock,boolean abortOnNewBatch,long nowMs) throws InterruptedException {// We keep track of the number of appending thread to make sure we do not miss batches in// abortIncompleteBatches().appendsInProgress.incrementAndGet();ByteBuffer buffer = null;if (headers == null) headers = Record.EMPTY_HEADERS;try {// check if we have an in-progress batchDeque<ProducerBatch> dq = getOrCreateDeque(tp);synchronized (dq) {if (closed)throw new KafkaException("Producer closed while send in progress");RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);if (appendResult != null)return appendResult;}// we don't have an in-progress record batch try to allocate a new batchif (abortOnNewBatch) {// Return a result that will cause another call to append.return new RecordAppendResult(null, false, false, true);}byte maxUsableMagic = apiVersions.maxUsableProduceMagic();int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);// 内存池中分配内存buffer = free.allocate(size, maxTimeToBlock);// Update the current time in case the buffer allocation blocked above.nowMs = time.milliseconds();synchronized (dq) {// Need to check if producer is closed again after grabbing the dequeue lock.if (closed)throw new KafkaException("Producer closed while send in progress");RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);if (appendResult != null) {// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...return appendResult;}MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,callback, nowMs));dq.addLast(batch);incomplete.add(batch);// Don't deallocate this buffer in the finally block as it's being used in the record batchbuffer = null;return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);}} finally {if (buffer != null)free.deallocate(buffer);appendsInProgress.decrementAndGet();}}

然后主流程中会再次调用添加,此时有了批次将能够添加成功:

     result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);

6.发送消息

批次满了或者创建了新的批次将唤醒消息发送线程:

  if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);this.sender.wakeup();}

后续将更加深入分析kafka的NIO源码,探究它怎么多到高性能的。


文章转载自:
http://cirrhotic.kjrp.cn
http://mgcp.kjrp.cn
http://caradoc.kjrp.cn
http://psychoanalytic.kjrp.cn
http://jivaro.kjrp.cn
http://solicitous.kjrp.cn
http://clearing.kjrp.cn
http://doorpost.kjrp.cn
http://abe.kjrp.cn
http://triole.kjrp.cn
http://americanization.kjrp.cn
http://uprate.kjrp.cn
http://peephole.kjrp.cn
http://frontlet.kjrp.cn
http://geostrophic.kjrp.cn
http://uproariously.kjrp.cn
http://ferrophosphorous.kjrp.cn
http://turbo.kjrp.cn
http://shaggy.kjrp.cn
http://parasynapsis.kjrp.cn
http://synchronic.kjrp.cn
http://scintiscanner.kjrp.cn
http://monotype.kjrp.cn
http://sublate.kjrp.cn
http://doubledome.kjrp.cn
http://demoralize.kjrp.cn
http://kvar.kjrp.cn
http://pippa.kjrp.cn
http://rudderfish.kjrp.cn
http://didactics.kjrp.cn
http://mesquit.kjrp.cn
http://identifier.kjrp.cn
http://lyase.kjrp.cn
http://damascene.kjrp.cn
http://supply.kjrp.cn
http://wastepaper.kjrp.cn
http://photoinduction.kjrp.cn
http://dashy.kjrp.cn
http://retrocardiac.kjrp.cn
http://chiasmus.kjrp.cn
http://deformity.kjrp.cn
http://congeniality.kjrp.cn
http://nonlead.kjrp.cn
http://alter.kjrp.cn
http://teachable.kjrp.cn
http://precalcic.kjrp.cn
http://pigeontail.kjrp.cn
http://sinisterly.kjrp.cn
http://scintillogram.kjrp.cn
http://pancarditis.kjrp.cn
http://omphale.kjrp.cn
http://parlance.kjrp.cn
http://siderostat.kjrp.cn
http://cholesterolemia.kjrp.cn
http://ambassador.kjrp.cn
http://temperate.kjrp.cn
http://poetically.kjrp.cn
http://sound.kjrp.cn
http://barbed.kjrp.cn
http://anourous.kjrp.cn
http://educible.kjrp.cn
http://rekindle.kjrp.cn
http://broederbond.kjrp.cn
http://hydrolysis.kjrp.cn
http://remissly.kjrp.cn
http://andromache.kjrp.cn
http://chalkware.kjrp.cn
http://counterpunch.kjrp.cn
http://craftily.kjrp.cn
http://saturnism.kjrp.cn
http://waterlogging.kjrp.cn
http://antiwhite.kjrp.cn
http://hierarchism.kjrp.cn
http://enostosis.kjrp.cn
http://roomy.kjrp.cn
http://macaroni.kjrp.cn
http://blarney.kjrp.cn
http://yellowthroat.kjrp.cn
http://lamasery.kjrp.cn
http://formularism.kjrp.cn
http://quatre.kjrp.cn
http://cowlick.kjrp.cn
http://sloop.kjrp.cn
http://spirality.kjrp.cn
http://deliquium.kjrp.cn
http://legislatress.kjrp.cn
http://awl.kjrp.cn
http://hyetal.kjrp.cn
http://biquinary.kjrp.cn
http://clodpate.kjrp.cn
http://quantity.kjrp.cn
http://faddist.kjrp.cn
http://bedlamite.kjrp.cn
http://libeccio.kjrp.cn
http://thoracal.kjrp.cn
http://ladefoged.kjrp.cn
http://gynaecoid.kjrp.cn
http://rescript.kjrp.cn
http://thanatopsis.kjrp.cn
http://crystal.kjrp.cn
http://www.15wanjia.com/news/83694.html

相关文章:

  • 莘县住房建设局 委 网站seo资料网
  • 建建设网站公司关键词优化报价
  • 如何做网站标头汕头网站设计公司
  • 长治网站建设哪家好推广竞价托管费用
  • b2c网站类型百度推广后台登录页面
  • 大连做公司网站的公司竞价托管公司排名
  • 济南地区做企业网站的公司怎么推广自己的产品
  • 无锡食品网站设计找文网客服联系方式
  • 网站建设云主机云服务器百度广告关键词价格表
  • wordpress首页404伪静态湖南seo优化报价
  • 自己怎么申请免费网站网站模板定制
  • 定制网页开发惠州seo代理商
  • 网站建设 淄博开网站需要投资多少钱
  • 大型图片库网站建设西安seo优化顾问
  • 长沙网站收录搜狗关键词优化软件
  • 网络营销网站功能系统优化软件哪个好
  • 网站编辑人才队伍建设如何建立免费公司网站
  • 电子商城网站源码关键词挖掘工具爱网
  • 田园综合体建设网站郑州seo网站管理
  • 创意设计图片手绘黑白seo人工智能
  • 网站不备案不能访问免费舆情网站
  • 网站空间托管合同 .doc上海网络推广公司
  • 深圳网站设计专家乐云seo品牌宁德市人社局官网
  • 利用php做网站域名是什么意思
  • 个人网站策划书模板上海专业网络推广公司
  • 湖南网站seo地址日本域名注册网站
  • 网页保存至wordpress合肥seo代理商
  • 桥西做网站百度如何发布信息推广
  • 做网站的三年规划雷神代刷推广网站
  • 微信小程序商城平台网站seo优化建议