首页
壁纸
放映厅
留言板
更多
直播
推荐
百度一下
腾讯视频
Search
1
2026新年快乐Python代码
18 阅读
2
酷狗音乐歌手写真api分享
9 阅读
3
欢迎使用 Typecho
7 阅读
4
用百度统计为你的博客赋能
7 阅读
5
Flink快速入门
7 阅读
前端
鸿蒙开发
后端
软件测试
python
人工智能
UI设计
软件分享
教程
网络安全
大数据
登录
Search
标签搜索
python
大数据
scala
前端
爬虫
Spark
运维
百度
数据采集
flink
跨年代码
API接口
windows
HTML
javascript
端口占用
django
服务器
命令
网络
小雷Debug
累计撰写
9
篇文章
累计收到
3
条评论
首页
栏目
前端
鸿蒙开发
后端
软件测试
python
人工智能
UI设计
软件分享
教程
网络安全
大数据
页面
壁纸
放映厅
留言板
直播
推荐
百度一下
腾讯视频
搜索到
1
篇与
数据采集
的结果
2026-01-02
Flink快速入门
{music-list id="3136952023" color="#1989fa" autoplay="autoplay"/}Flink命令行提交任务启动集群bin/start-cluster.sh在 node1 中执行以下命令启动 netcat。nc -lk 7777进入到 Flink 的安装路径下,在命令行使用 flink run 命令提交作业。./bin/flink run -m node1:8081 -c cn.xzlei.chap02.StreamWordCount ./flumeDemo-1.0-SNAPSHOT.jar -param --host node1 --port 7777参数: –m 指定了提交到的 JobManager,-c 指定了入口类-param 指定了jar的自定义参数查看指定任务信息bin/flink-cluster.sh list [taskID]查看所有任务信息bin/flink-cluster.sh list读取有界数据package cn.xzlei.chap05 import org.apache.flink.streaming.api.scala._ import java.util.Date case class Event(user:String,url:String,timestamp:Long) object SourceBoundedTest { def main(args: Array[String]): Unit = { // 创建一个执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置并行数量为1 env.setParallelism(1) // 创建一个数据列表 val list:List[Event] = List( Event("Piter", "./home?id=5", new Date().getTime), Event("Mary", "./shop?id=8", new Date().getTime), Event("Jack", "./userCenter?id=78", new Date().getTime), ) // 从集合中获取数据 val stream1 = env.fromCollection(list) // 从元素中获取数据 val stream2 = env.fromElements(1, 2, 3, 4, 5) // 从文本文件中获取数据 val stream3 = env.readTextFile("input/test.txt") stream1.print("stream1") stream2.print("number") stream3.print("stream3") env.execute() } } 读取Kafka数据添加pom.xml文件依赖:<!--Flink连接 Kafka相关依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.14</version> </dependency>编写代码package cn.xzlei.chap05 import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.scala._ import java.util.Properties object SourceKafkaTest { def main(args: Array[String]): Unit = { // 创建一个流处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 创建一个sourceKafka连接源 val kafkaSource = KafkaSource.builder[String]() .setBootstrapServers("node1:9092") .setTopics("clicks") .setGroupId("consumer-group") .setValueOnlyDeserializer(new SimpleStringSchema()) // .setStartingOffsets(OffsetsInitializer.latest()) .build() val stream:DataStream[String] = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafkaSource") stream.print() env.execute() } }启动zookeeper、hadoop集群、Kafka集群[hadoop@node1 zookeeper-3.5.7]$ bin/zkServer.sh start [hadoop@node2 zookeeper-3.5.7]$ bin/zkServer.sh start [hadoop@node3 zookeeper-3.5.7]$ bin/zkServer.sh starthdfs --daemon start[hadoop@node1 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties [hadoop@node2 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties [hadoop@node3 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties在node1节点上打开生产者,指定topic主题为clicks# 如果没有创建主题,创建主题 [hadoop@node1 kafka]$ bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic clicks # 开启生产者 [hadoop@node1 kafka]$ bin/kafka-console-producer.sh --broker-list node1:9092 --topic clicks运行程序、在生产者中生产数据,观察控制台结果
2026年01月02日
7 阅读
0 评论
0 点赞