流处理

对于 Flink 而言,流才是整个处理逻辑的底层核心,所以流批统一之后的 DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。 下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。

读取文件

要使用 Flink 的 DataStream API 读取文件,你可以使用 StreamExecutionEnvironment 类的 readTextFile() 方法。下面是一个简单的示例代码:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;

public class FileReadExample {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从文件中读取数据流
        DataStream<String> inputStream = env.readTextFile("path/to/file.txt");

        // 打印输出到控制台
        inputStream.print();

        // 启动作业执行
        env.execute("File Read Example");
    }
}

在上述代码中,我们首先创建了一个 StreamExecutionEnvironment 对象 env,然后通过调用 readTextFile() 方法来读取指定路径下的文本文件,并将其转换为 DataStream<String> 对象。接下来,我们通过调用 print() 方法将数据流打印到控制台。

最后,通过调用 execute() 方法启动作业执行。请确保替换 "path/to/file.txt" 为实际的文件路径。

编译并运行以上代码后,你将看到文件中的内容被逐行打印到控制台上。

读取 socket 文本流

要使用 Flink 的 DataStream API 读取 socket 文本流,你可以使用 StreamExecutionEnvironment 类的 socketTextStream() 方法。下面是一个简单的示例代码:

在上述代码中,我们首先创建了一个 StreamExecutionEnvironment 对象 env,然后通过调用 socketTextStream() 方法来连接指定的主机名(例如 "localhost")和端口号(例如 9999),并将其转换为 DataStream<String> 对象。

接下来,我们通过调用 print() 方法将数据流打印到控制台。

最后,通过调用 execute() 方法启动作业执行。

请确保在运行代码之前,有一个正在监听指定主机和端口的 socket 服务器,并且能够发送文本数据流。

最后更新于