WordCount 案例
WordCount 是一个经典的示例,在文本数据中统计每个单词出现的次数。
首先,你需要设置 Flink 的开发环境。确保你已经安装了 Java 和 Maven,并下载并解压了 Flink 发行版。
然后,创建一个新的 Maven 项目,并在 pom.xml 文件中添加以下依赖:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>接下来,创建一个名为 WordCount 的 Java 类,并编写以下代码:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 输入数据流
DataStream<String> text = env.socketTextStream("localhost", 9999);
// 单词拆分和计数
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
// 打印结果
counts.print();
// 启动执行环境
env.execute("WordCount");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
}在上述代码中,我们首先创建了一个 StreamExecutionEnvironment 对象 env,然后通过 socketTextStream 方法从本地的 9999 端口接收输入数据流。
接着,我们定义了一个名为 Tokenizer 的内部类,实现了 Flink 的 FlatMapFunction 接口。在 flatMap 方法中,我们将输入的文本拆分成单词,并发送给下游算子进行计数。
最后,我们通过调用 keyBy(0) 将单词作为键值对的键,然后使用 sum(1) 进行累加计数。最终,我们通过调用 print() 方法打印出计数结果。
编写完代码后,你可以使用 Maven 编译和打包项目。运行以下命令:
完成后,在 target 目录下会生成一个名为 flink-wordcount.jar 的可执行文件。
最后,你可以在 Flink 安装目录下的 bin 目录中运行 Flink 集群,并提交作业。使用以下命令:
如果一切顺利,你将在控制台上看到单词计数结果。
最后更新于