AI摘要
本文介绍了Flink的快速入门方法,包括如何启动集群、提交任务、查看任务信息,以及如何读取有界数据和Kafka数据。文章提供了详细的命令行操作步骤和示例代码,帮助读者快速掌握Flink的基本使用方法。
Flink命令行提交任务
- 启动集群
bin/start-cluster.sh- 在 node1 中执行以下命令启动 netcat。
nc -lk 7777- 进入到 Flink 的安装路径下,在命令行使用 flink run 命令提交作业。
./bin/flink run -m node1:8081 -c cn.xzlei.chap02.StreamWordCount ./flumeDemo-1.0-SNAPSHOT.jar -param --host node1 --port 7777参数:
–m 指定了提交到的 JobManager,
-c 指定了入口类
-param 指定了jar的自定义参数
- 查看指定任务信息
bin/flink-cluster.sh list [taskID]- 查看所有任务信息
bin/flink-cluster.sh list读取有界数据
package cn.xzlei.chap05
import org.apache.flink.streaming.api.scala._
import java.util.Date
case class Event(user:String,url:String,timestamp:Long)
object SourceBoundedTest {
def main(args: Array[String]): Unit = {
// 创建一个执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置并行数量为1
env.setParallelism(1)
// 创建一个数据列表
val list:List[Event] = List(
Event("Piter", "./home?id=5", new Date().getTime),
Event("Mary", "./shop?id=8", new Date().getTime),
Event("Jack", "./userCenter?id=78", new Date().getTime),
)
// 从集合中获取数据
val stream1 = env.fromCollection(list)
// 从元素中获取数据
val stream2 = env.fromElements(1, 2, 3, 4, 5)
// 从文本文件中获取数据
val stream3 = env.readTextFile("input/test.txt")
stream1.print("stream1")
stream2.print("number")
stream3.print("stream3")
env.execute()
}
}
读取Kafka数据
- 添加pom.xml文件依赖:
<!--Flink连接 Kafka相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14</version>
</dependency>- 编写代码
package cn.xzlei.chap05
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import java.util.Properties
object SourceKafkaTest {
def main(args: Array[String]): Unit = {
// 创建一个流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建一个sourceKafka连接源
val kafkaSource = KafkaSource.builder[String]()
.setBootstrapServers("node1:9092")
.setTopics("clicks")
.setGroupId("consumer-group")
.setValueOnlyDeserializer(new SimpleStringSchema())
// .setStartingOffsets(OffsetsInitializer.latest())
.build()
val stream:DataStream[String] = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafkaSource")
stream.print()
env.execute()
}
}- 启动zookeeper、hadoop集群、Kafka集群
[hadoop@node1 zookeeper-3.5.7]$ bin/zkServer.sh start
[hadoop@node2 zookeeper-3.5.7]$ bin/zkServer.sh start
[hadoop@node3 zookeeper-3.5.7]$ bin/zkServer.sh starthdfs --daemon start[hadoop@node1 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[hadoop@node2 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[hadoop@node3 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties- 在node1节点上打开生产者,指定topic主题为
clicks
# 如果没有创建主题,创建主题
[hadoop@node1 kafka]$ bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic clicks
# 开启生产者
[hadoop@node1 kafka]$ bin/kafka-console-producer.sh --broker-list node1:9092 --topic clicks- 运行程序、在生产者中生产数据,观察控制台结果


评论 (0)