流处理
对于 Flink 而言,流才是整个处理逻辑的底层核心,所以流批统一之后的 DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。 下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。
读取文件
要使用 Flink 的 DataStream API 读取文件,你可以使用 StreamExecutionEnvironment 类的 readTextFile() 方法。下面是一个简单的示例代码:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
public class FileReadExample {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从文件中读取数据流
DataStream<String> inputStream = env.readTextFile("path/to/file.txt");
// 打印输出到控制台
inputStream.print();
// 启动作业执行
env.execute("File Read Example");
}
}在上述代码中,我们首先创建了一个 StreamExecutionEnvironment 对象 env,然后通过调用 readTextFile() 方法来读取指定路径下的文本文件,并将其转换为 DataStream<String> 对象。接下来,我们通过调用 print() 方法将数据流打印到控制台。
最后,通过调用 execute() 方法启动作业执行。请确保替换 "path/to/file.txt" 为实际的文件路径。
编译并运行以上代码后,你将看到文件中的内容被逐行打印到控制台上。
读取 socket 文本流
要使用 Flink 的 DataStream API 读取 socket 文本流,你可以使用 StreamExecutionEnvironment 类的 socketTextStream() 方法。下面是一个简单的示例代码:
在上述代码中,我们首先创建了一个 StreamExecutionEnvironment 对象 env,然后通过调用 socketTextStream() 方法来连接指定的主机名(例如 "localhost")和端口号(例如 9999),并将其转换为 DataStream<String> 对象。
接下来,我们通过调用 print() 方法将数据流打印到控制台。
最后,通过调用 execute() 方法启动作业执行。
请确保在运行代码之前,有一个正在监听指定主机和端口的 socket 服务器,并且能够发送文本数据流。
最后更新于