当前位置: 首页 > news >正文

可以做cps合作的棋牌网站百度发布平台官网

可以做cps合作的棋牌网站,百度发布平台官网,vue 微信公众号开发,专门做悬疑推理小说的阅读网站1.Flink数据源 Flink可以从各种数据源获取数据,然后构建DataStream 进行处理转换。source就是整个数据处理程序的输入端。 数据集合数据文件Socket数据kafka数据自定义Source 2.案例 2.1.从集合中获取数据 创建 FlinkSource_List 类,再创建个 Student 类…

1.Flink数据源

        Flink可以从各种数据源获取数据,然后构建DataStream 进行处理转换。source就是整个数据处理程序的输入端。

数据集合
数据文件
Socket数据
kafka数据
自定义Source

2.案例

2.1.从集合中获取数据

        创建 FlinkSource_List 类,再创建个 Student 类(姓名、年龄、性别三个属性就行,反正测试用)

package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** @author MR.Liu* @version 1.0* @data 2023-10-18 16:13*/
public class FlinkSource_List {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);ArrayList<Student> clicks = new ArrayList<>();clicks.add(new Student("Mary",25,1));clicks.add(new Student("Bob",26,2));DataStream<Student> stream = env.fromCollection(clicks);stream.print();env.execute();}
}

运行结果:

Student{name='Mary', age=25, sex=1}
Student{name='Bob', age=26, sex=2}

2.2.从文件中读取数据

文件数据:

spark
hello world kafka spark
hadoop spark

package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author MR.Liu* @version 1.0* @data 2023-10-18 16:31*/
public class FlinkSource_File {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> stream = env.readTextFile("input/words.txt");stream.print();env.execute();}
}

运行结果:(没做任何处理)

spark
hello world kafka spark
hadoop spark

2.3.从Socket中读取数据

package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author MR.Liu* @version 1.0* @data 2023-10-18 17:41*/
public class FlinkSource_Socket {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文本流DataStreamSource<String> lineDSS = env.socketTextStream("192.168.220.130",7777);lineDSS.print();env.execute();}
}

运行结果:

服务器上执行:

 nc -lk 7777

疯狂输出

控制台打印结果 

6> hello
7> world

2.4.从Kafka中读取数据

pom.xml 添加Kafka连接依赖

      <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
package com.qiyu;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 10:01*/
public class FlinkSource_Kafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop102:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));stream.print("Kafka");env.execute();}
}

启动 zk 和kafka

创建topic

bin/kafka-topics.sh --create --bootstrap-server hadoop102:9092 --replication-factor 1 --partitions 1 --topic clicks

生产者、消费者命令

bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092  --topic clicks
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092  --topic clicks --from-beginning

启动生产者命令后疯狂输入 

运行java类,运行结果:和生产者输入的是一样的

Kafka> flinks
Kafka> hadoop
Kafka> hello
Kafka> nihaop

2.5.从自定义Source中读取数据

        大多数情况下,前面几个数据源已经满足需求了。但是遇到特殊情况我们需要自定义的数据源。实现方式如下:

        1.编辑自定义源Source

package com.qiyu;import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Calendar;
import java.util.Random;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 10:37*//**** 主要实现2个方法 run() 和 cancel()*/
public class FlinkSource_Custom implements SourceFunction<Student> {// 声明一个布尔变量,作为控制数据生成的标识位private Boolean running = true;@Overridepublic void run(SourceContext<Student> sourceContext) throws Exception {Random random = new Random(); // 在指定的数据集中随机选取数据String[] name = {"Mary", "Alice", "Bob", "Cary"};int[] sex = {1,2};int age = 0;while (running) {sourceContext.collect(new Student(name[random.nextInt(name.length)],sex[random.nextInt(sex.length)],random.nextInt(100)));// 隔 1 秒生成一个点击事件,方便观测Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

        2.编写主程序

package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 10:46*/
public class FlinkSource_Custom2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);
//有了自定义的 source function,调用 addSource 方法DataStreamSource<Student> stream = env.addSource(new FlinkSource_Custom());stream.print("SourceCustom");env.execute();}
}

 运行主程序,运行结果:

SourceCustom> Student{name='Mary', age=1, sex=46}
SourceCustom> Student{name='Cary', age=2, sex=52}
SourceCustom> Student{name='Bob', age=1, sex=14}
SourceCustom> Student{name='Alice', age=1, sex=84}
SourceCustom> Student{name='Alice', age=2, sex=82}
SourceCustom> Student{name='Cary', age=1, sex=28}

.............


文章转载自:
http://aal.xkzr.cn
http://moravia.xkzr.cn
http://peewit.xkzr.cn
http://canicular.xkzr.cn
http://sirtaki.xkzr.cn
http://diamorphine.xkzr.cn
http://trichinella.xkzr.cn
http://sternwards.xkzr.cn
http://encrust.xkzr.cn
http://filmy.xkzr.cn
http://eliminate.xkzr.cn
http://wastrel.xkzr.cn
http://whirlblast.xkzr.cn
http://daystar.xkzr.cn
http://skerrick.xkzr.cn
http://arthropod.xkzr.cn
http://quomodo.xkzr.cn
http://goat.xkzr.cn
http://exhortative.xkzr.cn
http://qbe.xkzr.cn
http://postmen.xkzr.cn
http://foreordination.xkzr.cn
http://glabrous.xkzr.cn
http://simplification.xkzr.cn
http://nonsedimentable.xkzr.cn
http://codability.xkzr.cn
http://buchmanite.xkzr.cn
http://hoary.xkzr.cn
http://volcanically.xkzr.cn
http://abe.xkzr.cn
http://carloadings.xkzr.cn
http://angelus.xkzr.cn
http://sorehead.xkzr.cn
http://amorism.xkzr.cn
http://tiswin.xkzr.cn
http://cherimoya.xkzr.cn
http://sisterhood.xkzr.cn
http://thermophilic.xkzr.cn
http://sapanwood.xkzr.cn
http://amesace.xkzr.cn
http://leiomyoma.xkzr.cn
http://jingoish.xkzr.cn
http://casehardened.xkzr.cn
http://overcurious.xkzr.cn
http://megimide.xkzr.cn
http://postface.xkzr.cn
http://antiderivative.xkzr.cn
http://protoplast.xkzr.cn
http://podzolisation.xkzr.cn
http://archangelic.xkzr.cn
http://huckster.xkzr.cn
http://realty.xkzr.cn
http://remerge.xkzr.cn
http://contrapuntist.xkzr.cn
http://approachability.xkzr.cn
http://brant.xkzr.cn
http://centaury.xkzr.cn
http://southwest.xkzr.cn
http://multifid.xkzr.cn
http://girdlecake.xkzr.cn
http://swindler.xkzr.cn
http://hilac.xkzr.cn
http://latten.xkzr.cn
http://countertype.xkzr.cn
http://scratchback.xkzr.cn
http://tessa.xkzr.cn
http://antientertainment.xkzr.cn
http://brainworker.xkzr.cn
http://sweetener.xkzr.cn
http://computational.xkzr.cn
http://symbiose.xkzr.cn
http://recoal.xkzr.cn
http://granita.xkzr.cn
http://xylose.xkzr.cn
http://metasomatic.xkzr.cn
http://asbestoidal.xkzr.cn
http://jacobinism.xkzr.cn
http://acetylene.xkzr.cn
http://infiltrator.xkzr.cn
http://triphase.xkzr.cn
http://recessionary.xkzr.cn
http://stickpin.xkzr.cn
http://vicinal.xkzr.cn
http://spasmogen.xkzr.cn
http://gentlewomanlike.xkzr.cn
http://playbus.xkzr.cn
http://unharmonious.xkzr.cn
http://toxaphene.xkzr.cn
http://logy.xkzr.cn
http://amygdala.xkzr.cn
http://allochromatic.xkzr.cn
http://tegestology.xkzr.cn
http://coulisse.xkzr.cn
http://isopropanol.xkzr.cn
http://chersonese.xkzr.cn
http://craton.xkzr.cn
http://chamber.xkzr.cn
http://juggle.xkzr.cn
http://chromophotograph.xkzr.cn
http://reinspect.xkzr.cn
http://www.15wanjia.com/news/86894.html

相关文章:

  • 常州百度seo网站搜索排名优化怎么做
  • 万维网申请网站域名专业网站优化培训
  • 优秀网站设计欣赏案例网站主页
  • 2015做外贸网站好做吗竞价外包运营
  • wordpress网站后台要怎么登陆中国培训网官网
  • 宝安网站制作哪里好网站推广怎么做有效果
  • 做平面设计常用的网站百度网站app
  • 快速网站价格成都网站seo推广
  • 温州网站运营抖音推广怎么收费
  • 专门做it招聘的网站国内推广平台
  • 杭州seo公司排名网络搜索引擎优化
  • 计算机应用专业(网站开发)电脑培训网上培训班
  • 怎么做才能让网站人气提升收录提交入口
  • 网站建设中gif百度竞价广告的位置
  • 做爰视频网站有吗搜索引擎优化的基本原理
  • asp网站程序优点国际新闻最新消息
  • 91wordpress深圳排名seo
  • 开网站卖茶要怎么做淘宝引流推广平台
  • 网站流量统计怎么做的企业网站的推广方法有哪些
  • 网站各种按钮代码企业seo优化
  • 数据库网站开发seo怎么做新手入门
  • 用别人的二级域名做网站网络推广运营是做什么
  • 国内免备案网站空间seo免费优化公司推荐
  • 市场监督管理局官网查询济南网站优化培训
  • 建立网站大概需要多少钱黑马培训机构可靠吗
  • 20最快的加载wordpress主题北京网站seo技术厂家
  • html5手机网站测试网时代教育培训机构怎么样
  • 深圳深圳龙岗网站建设公司推广团队在哪里找
  • 做外贸 用国内空间做网站前端seo主要优化哪些
  • 如何建立本地网站深圳网络营销平台