Flink快速入门

小雷debug
2026-01-02 / 0 评论 / 7 阅读 / 正在检测是否收录...

AI摘要

本文介绍了Flink的快速入门方法,包括如何启动集群、提交任务、查看任务信息,以及如何读取有界数据和Kafka数据。文章提供了详细的命令行操作步骤和示例代码,帮助读者快速掌握Flink的基本使用方法。

Flink命令行提交任务

  1. 启动集群
bin/start-cluster.sh
  1. 在 node1 中执行以下命令启动 netcat。
nc -lk 7777
  1. 进入到 Flink 的安装路径下,在命令行使用 flink run 命令提交作业。
./bin/flink run -m node1:8081 -c cn.xzlei.chap02.StreamWordCount ./flumeDemo-1.0-SNAPSHOT.jar -param --host node1 --port 7777

参数:

–m 指定了提交到的 JobManager,

-c 指定了入口类

-param 指定了jar的自定义参数

  1. 查看指定任务信息
bin/flink-cluster.sh list [taskID]
  1. 查看所有任务信息
bin/flink-cluster.sh list

读取有界数据

package cn.xzlei.chap05

import org.apache.flink.streaming.api.scala._

import java.util.Date

case class Event(user:String,url:String,timestamp:Long)

object SourceBoundedTest {
  def main(args: Array[String]): Unit = {
    // 创建一个执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 设置并行数量为1
    env.setParallelism(1)

    // 创建一个数据列表
    val list:List[Event] = List(
      Event("Piter", "./home?id=5", new Date().getTime),
      Event("Mary", "./shop?id=8", new Date().getTime),
      Event("Jack", "./userCenter?id=78", new Date().getTime),
    )

    // 从集合中获取数据
    val stream1 = env.fromCollection(list)

    // 从元素中获取数据
    val stream2 = env.fromElements(1, 2, 3, 4, 5)

    // 从文本文件中获取数据
    val stream3 = env.readTextFile("input/test.txt")


    stream1.print("stream1")
    stream2.print("number")
    stream3.print("stream3")

    env.execute()
  }

}

读取Kafka数据

  1. 添加pom.xml文件依赖:
<!--Flink连接 Kafka相关依赖-->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.12</artifactId>
  <version>1.14</version>
</dependency>
  1. 编写代码
package cn.xzlei.chap05

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._

import java.util.Properties

object SourceKafkaTest {
  def main(args: Array[String]): Unit = {
    // 创建一个流处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 创建一个sourceKafka连接源
    val kafkaSource = KafkaSource.builder[String]()
      .setBootstrapServers("node1:9092")
      .setTopics("clicks")
      .setGroupId("consumer-group")
      .setValueOnlyDeserializer(new SimpleStringSchema())
//      .setStartingOffsets(OffsetsInitializer.latest())
      .build()
    val stream:DataStream[String] = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafkaSource")

    stream.print()

    env.execute()
  }
}
  1. 启动zookeeperhadoop集群Kafka集群
[hadoop@node1 zookeeper-3.5.7]$ bin/zkServer.sh start
[hadoop@node2 zookeeper-3.5.7]$ bin/zkServer.sh start
[hadoop@node3 zookeeper-3.5.7]$ bin/zkServer.sh start
hdfs --daemon start
[hadoop@node1 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[hadoop@node2 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[hadoop@node3 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
  1. 在node1节点上打开生产者,指定topic主题为clicks
# 如果没有创建主题,创建主题
[hadoop@node1 kafka]$ bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic clicks
# 开启生产者
[hadoop@node1 kafka]$ bin/kafka-console-producer.sh --broker-list node1:9092 --topic clicks
  1. 运行程序、在生产者中生产数据,观察控制台结果
    mjwm39tq.png
    mjwm3ic4.png
0

评论 (0)

取消