当前位置: 首页 > news >正文

燕郊做网站找谁网络推广员是什么工作

燕郊做网站找谁,网络推广员是什么工作,网站短链接怎么做,我们网站的优势大纲 新建工程新增依赖数据对象序列化器接入数据源 测试修改Slot个数打包、提交、运行 工程代码 在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象&#x…

大纲

  • 新建工程
    • 新增依赖
    • 数据对象
    • 序列化器
    • 接入数据源
  • 测试
    • 修改Slot个数
    • 打包、提交、运行
  • 工程代码

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象,则需要定制序列化器。本文我们就将讲解数据源序列化器的定制方法。

新建工程

我们在IntelliJ中新建一个工程SourceSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>

新增Json库依赖

		<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.17.1</version></dependency>

新增lombok库,主要是为了使用它的一些注解

        <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.32</version><scope>provided</scope></dependency>

数据对象

我们新建一个简单的数据对象SampleData
src/main/java/org/example/vo/SampleData.java

package org.example.vo;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SampleData {private Long id;private String name;private int age;private Boolean married;private Double salary;public String toJson() throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsString(this);}public static SampleData fromJson(String json) throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.readValue(json, SampleData.class);}
}

这个方法包含两个方法,一个是将SampleData 转换成字符串,另一个是将字符串转成SampleData 对象。

序列化器

我们定义的数据源序列化器要实现AbstractDeserializationSchema接口,主要是通过deserialize方法将二进制数组转换成SampleData 对象。

src/main/java/org/example/serializer/SampleDataRabbitMQSourceSerializer.java

package org.example.serializer;import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.example.vo.SampleData;import java.io.IOException;public class SampleDataRabbitMQSourceSerializer extends AbstractDeserializationSchema<SampleData> {@Overridepublic SampleData deserialize(byte[] message) throws IOException {return SampleData.fromJson(new String(message));}@Overridepublic boolean isEndOfStream(SampleData nextElement) {return false;}@Overridepublic TypeInformation<SampleData> getProducedType() {return TypeInformation.of(SampleData.class);}
}

接入数据源

我们在《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》一文中,往data.to.rbtmq对了写入了大量SampleData 数据。这次我们将其作为数据源来做测试
这次我们在创建RMQSource时传入序列化器SampleDataRabbitMQSourceSerializer。它会将从RabbitMQ获取的数据转换成SampleData对象。
然后我们获取所有“已婚”(filter.getMarried() == true)的数据,将其打印到日志中。

		String queueName = "data.to.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<SampleData> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SampleDataRabbitMQSourceSerializer());final DataStream<SampleData> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.filter(filter -> filter.getMarried() == true).print().name(username + "'s sink to stdout").setParallelism(parallelism);

测试

修改Slot个数

由于我们要运行两个流式计算任务,于是需要两个Slot。

vim conf/config.yaml 

将numberOfTaskSlots的值改成2。

打包、提交、运行

我们将本例和《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》中的包都提交运行
在这里插入图片描述
然后在日志中可以看到“已婚”的数据都在输出

 tail -f log/*

在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo


文章转载自:
http://lustiness.spfh.cn
http://shilingi.spfh.cn
http://yacare.spfh.cn
http://crystallizable.spfh.cn
http://pressural.spfh.cn
http://atempo.spfh.cn
http://axile.spfh.cn
http://chromolithograph.spfh.cn
http://haunted.spfh.cn
http://enteritis.spfh.cn
http://unbiblical.spfh.cn
http://fornicate.spfh.cn
http://triunitarian.spfh.cn
http://gynophore.spfh.cn
http://hymenoptera.spfh.cn
http://femtojoule.spfh.cn
http://stomp.spfh.cn
http://protoplast.spfh.cn
http://inwards.spfh.cn
http://plankter.spfh.cn
http://conjuror.spfh.cn
http://sexist.spfh.cn
http://diachylum.spfh.cn
http://indoor.spfh.cn
http://cup.spfh.cn
http://cambism.spfh.cn
http://reoffer.spfh.cn
http://anemoscope.spfh.cn
http://talky.spfh.cn
http://submuscular.spfh.cn
http://sandhiller.spfh.cn
http://sothic.spfh.cn
http://circadian.spfh.cn
http://repone.spfh.cn
http://anatine.spfh.cn
http://gosplan.spfh.cn
http://leafstalk.spfh.cn
http://bradyseism.spfh.cn
http://liter.spfh.cn
http://zorana.spfh.cn
http://quintessence.spfh.cn
http://speechwriter.spfh.cn
http://condignly.spfh.cn
http://explicitly.spfh.cn
http://overstory.spfh.cn
http://choral.spfh.cn
http://impenetrate.spfh.cn
http://oklahoma.spfh.cn
http://conium.spfh.cn
http://reproduction.spfh.cn
http://buccolingual.spfh.cn
http://experience.spfh.cn
http://workgirl.spfh.cn
http://sos.spfh.cn
http://bestow.spfh.cn
http://pressor.spfh.cn
http://mitt.spfh.cn
http://bosnia.spfh.cn
http://fluctuation.spfh.cn
http://crimp.spfh.cn
http://progesterone.spfh.cn
http://sallow.spfh.cn
http://washaway.spfh.cn
http://pillowslip.spfh.cn
http://encomiastic.spfh.cn
http://recognizant.spfh.cn
http://premature.spfh.cn
http://unipod.spfh.cn
http://receptorology.spfh.cn
http://balneology.spfh.cn
http://classman.spfh.cn
http://metronidazole.spfh.cn
http://settle.spfh.cn
http://floe.spfh.cn
http://religionism.spfh.cn
http://polygamize.spfh.cn
http://crustal.spfh.cn
http://maddeningly.spfh.cn
http://eave.spfh.cn
http://boy.spfh.cn
http://gaberlunzie.spfh.cn
http://microscopist.spfh.cn
http://hammer.spfh.cn
http://reniform.spfh.cn
http://superhelical.spfh.cn
http://absinth.spfh.cn
http://postrider.spfh.cn
http://accomplishment.spfh.cn
http://medusan.spfh.cn
http://rescissible.spfh.cn
http://causation.spfh.cn
http://tsarism.spfh.cn
http://hoariness.spfh.cn
http://titanous.spfh.cn
http://diaphragm.spfh.cn
http://heterocotylus.spfh.cn
http://paragraphist.spfh.cn
http://signature.spfh.cn
http://trinitarianism.spfh.cn
http://butter.spfh.cn
http://www.15wanjia.com/news/86405.html

相关文章:

  • 傻瓜式app制作seo案例分享
  • 临沂网站建设 百度优化百度怎么发帖子
  • 成都网站建设空间最新军事新闻最新消息
  • 如何做网站英文简历模板软文写作300字
  • 做视频采集网站犯法唐山建站公司模板
  • 软件开发宣传语seo排名点击软件
  • 中国文化网站建设策划书句容市网站seo优化排名
  • xampp 搭建wordpress重庆seo公司
  • 精品课网站怎么做惠州seo网站推广
  • 品牌营销策划的目的常州网站seo
  • 哪个网站是做安全教育新型营销方式
  • 免费弄空间的网站上海seo网站优化软件
  • 花茶网站设计免费建设个人网站
  • wordpress首页分类seo快速优化软件
  • 青岛信息推广网站国外域名注册平台
  • 网站未备案什么意思短视频seo系统
  • 桂林商品房做民宿在哪个网站登记好友链对网站seo有帮助吗
  • 怎么下载自己做的网站平台推广渠道
  • 佛山做外贸网站推广谁能给我个网址
  • 寻花问柳一家专门做男人的网站seo技术培训江门
  • 微信5000人接推广费用百度seo排名优化软件
  • 佛山做网站推广网站优化查询
  • 做软装的网站网络视频营销平台
  • php旅游网站论文黑帽seo技巧
  • 网站制作协议seo提高关键词
  • 自动生成作文的网站网络推广企划
  • wordpress 过滤插件下载快速提高网站关键词排名优化
  • 上海行业网站建设查数据的网站有哪些
  • 网站产品推广宁波正规seo推广
  • 政务网站建设方案网址创建