DataStream API:Overview_fromcollection(iterator, class)-程序员宅基地

技术标签: jvm  java  flink官方文档翻译-DataStream API  开发语言  

Flink DataStream API Programming Guide Flink DataStream API编程指南

DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., filtering, updating state, defining windows, aggregating). The data streams are initially created from various sources (e.g., message queues, socket streams, files). Results are returned via sinks, which may for example write the data to files, or to standard output (for example the command line terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. The execution can happen in a local JVM, or on clusters of many machines.
Flink中的DataStream程序是在数据流上实现转换的常规程序(例如,过滤、更新状态、定义窗口、聚合)。数据流最初是从各种sources(例如消息队列、套接字流、文件)创建的。结果通过sinks返回,sinks可以将数据写入文件或标准输出(例如命令行终端)。Flink程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其他程序中。执行可以在本地JVM中进行,也可以在多台机器的集群上进行。

In order to create your own Flink DataStream program, we encourage you to start with anatomy of a Flink Program and gradually add your own stream transformations. The remaining sections act as references for additional operations and advanced features.
为了创建你自己的Flink DataStream程序,我们鼓励您从anatomy of a Flink Program章节开始,逐步添加自己的流转换。其余章节作为其他操作和高级功能的参考。

What is a DataStream? DataStream是什么?

The DataStream API gets its name from the special DataStream class that is used to represent a collection of data in a Flink program. You can think of them as immutable collections of data that can contain duplicates. This data can either be finite or unbounded, the API that you use to work on them is the same.
DataStream API从特殊的DataStream类(被用于表示Flink程序中的数据集合)中获取其名称。您可以将它们(DataStream)视为可以包含重复项的不可变数据集合。包含数据可以是有限的,也可以是无限的,用于处理它们的API是相同的。

A DataStream is similar to a regular Java Collection in terms of usage but is quite different in some key ways. They are immutable, meaning that once they are created you cannot add or remove elements. You can also not simply inspect the elements inside but only work on them using the DataStream API operations, which are also called transformations.
DataStream在使用方面类似于常规Java集合,但在某些关键方面有很大不同。它们是不可变的,这意味着一旦创建它们,就不能添加或删除元素,也不能简单地查看它们内部的元素,但可以使用DataStream API(也称为transformation)处理它们。

You can create an initial DataStream by adding a source in a Flink program. Then you can derive new streams from this and combine them by using API methods such as map, filter, and so on.
您可以通过在Flink程序中添加source来创建初始DataStream。然后,您可以从中获得新的流,并通过使用API方法(如map、filter等)将它们结合起来。

Anatomy of a Flink Program Flink程序剖析

Flink programs look like regular programs that transform DataStreams. Each program consists of the same basic parts:
Flink程序看起来像是转换DataStreams的常规程序。每个程序由相同的基本部分组成:

  1. Obtain an execution environment, 获得执行环境,
  2. Load/create the initial data, 加载/创建初始数据,
  3. Specify transformations on this data, 指定对该数据的转换,
  4. Specify where to put the results of your computations, 指定将计算结果放在哪里,
  5. Trigger the program execution 触发程序执行

We will now give an overview of each of those steps, please refer to the respective sections for more details. Note that all core classes of the Java DataStream API can be found in org.apache.flink.streaming.api .
我们现在将概述这些步骤中的每一步,请参阅相应的章节以了解更多详细信息。注意,可以在org.apache.flink.streaming.api中找到Java DataStream API的所有核心类。

The StreamExecutionEnvironment is the basis for all Flink programs. You can obtain one using these static methods on StreamExecutionEnvironment:
StreamExecutionEnvironment是所有Flink程序的基础。您可以在StreamExecutionEnvironment上使用以下静态方法获得一个:

getExecutionEnvironment();

createLocalEnvironment();

createRemoteEnvironment(String host, int port, String... jarFiles);

Typically, you only need to use getExecutionEnvironment(), since this will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from your program, and invoke it through the command line, the Flink cluster manager will execute your main method and getExecutionEnvironment() will return an execution environment for executing your program on a cluster.
通常,您只需要使用getExecutionEnvironment(),因为这将根据上下文做正确的事情:如果您在IDE中执行程序,或者作为常规Java程序,它将创建一个本地环境,在本地机器上执行程序。如果您从程序中创建了一个JAR文件,并通过命令行调用它,Flink集群管理器将执行您的main方法并且getExecutionEnvironment()将返回一个执行环境,用于在集群上执行您的程序。

For specifying data sources the execution environment has several methods to read from files using various methods: you can just read them line by line, as CSV files, or using any of the other provided sources. To just read a text file as a sequence of lines, you can use:
执行环境有几种方式可以从文件中读取数据以创建数据源:您可以从CSV文件中逐行读取,或使用任何其他提供的sources。要将文本文件作为一系列行读取,可以使用:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

This will give you a DataStream on which you can then apply transformations to create new derived DataStreams.
这将为您提供一个DataStream,然后您可以在其上应用转换以创建新的派生DataStreams。

You apply transformations by calling methods on DataStream with a transformation functions. For example, a map transformation looks like this:
通过调用DataStream上的方法来实现转换。例如,map转换如下所示:

DataStream<String> input = ...;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    
    @Override
    public Integer map(String value) {
    
        return Integer.parseInt(value);
    }
});

This will create a new DataStream by converting every String in the original collection to an Integer.
这里通过将原始集合中的每个String转换为Integer来创建新的DataStream。

Once you have a DataStream containing your final results, you can write it to an outside system by creating a sink. These are just some example methods for creating a sink:
一旦有了包含最终结果的DataStream,就可以通过创建sink将其写入外部系统。以下是创建sink的一些示例方法:

writeAsText(String path);

print();

Once you specified the complete program you need to trigger the program execution by calling execute() on the StreamExecutionEnvironment. Depending on the type of the ExecutionEnvironment the execution will be triggered on your local machine or submit your program for execution on a cluster.
构建程序后,需要在StreamExecutionEnvironment上调用execute()来触发程序执行。根据ExecutionEnvironment的类型,执行将在本地计算机上触发,或提交程序以在集群上执行。

The execute() method will wait for the job to finish and then return a JobExecutionResult, this contains execution times and accumulator results.
execute()方法将等待作业完成,然后返回JobExecutionResult,其中包含执行时间和累加器结果。

If you don’t want to wait for the job to finish, you can trigger asynchronous job execution by calling executeAsync() on the StreamExecutionEnvironment. It will return a JobClient with which you can communicate with the job you just submitted. For instance, here is how to implement the semantics of execute() by using executeAsync().
如果不想等待作业完成,可以通过在StreamExecutionEnvironment上调用executeAsync()来触发异步作业执行。它将返回一个JobClient,您可以通过它与刚刚提交的作业进行通信。例如,下面是如何使用executeAsync()实现execute()语义。

final JobClient jobClient = env.executeAsync();

final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();

That last part about program execution is crucial to understanding when and how Flink operations are executed. All Flink programs are executed lazily: When the program’s main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to a dataflow graph. The operations are actually executed when the execution is explicitly triggered by an execute() call on the execution environment. Whether the program is executed locally or on a cluster depends on the type of execution environment.
程序执行的最后一部分对于理解何时以及如何执行Flink操作至关重要。所有Flink程序都是懒惰执行的:当程序的main方法被执行时,数据加载和转换不会直接发生,而是每个操作都被创建并添加到dataflow图中。当执行环境上的execute()调用显式地触发执行时,才实际执行操作。程序是在本地执行还是在集群上执行取决于执行环境的类型。

The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.
懒惰评估允许您构建复杂的程序,Flink将其作为一个整体规划的单元执行。

Example Program 示例程序

The following program is a complete, working example of streaming window word count application, that counts the words coming from a web socket in 5 second windows. You can copy & paste the code to run it locally.
下面的程序是一个完整的流式窗口单词计数应用程序的示例,它在5秒的窗口中对来自web套接字的单词进行计数。您可以复制和粘贴代码以在本地运行。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {
    

    public static void main(String[] args) throws Exception {
    

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
    
            for (String word: sentence.split(" ")) {
    
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

To run the example program, start the input stream with netcat first from a terminal:
要运行示例程序,请首先从终端使用netcat启动输入流:

nc -lk 9999

Just type some words hitting return for a new word. These will be the input to the word count program. If you want to see counts greater than 1, type the same word again and again within 5 seconds (increase the window size from 5 seconds if you cannot type that fast ).
只需键入一些单词,然后按回车键输入新单词。这些将是单词计数程序的输入。如果您希望看到计数大于1,请在5秒内反复键入相同的单词(如果您无法快速键入,请增加窗口大小).

Data Sources 数据源

Sources are where your program reads its input from. You can attach a source to your program by using StreamExecutionEnvironment.addSource(sourceFunction). Flink comes with a number of pre-implemented source functions, but you can always write your own custom sources by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending the RichParallelSourceFunction for parallel sources.
Sources是程序从中读取输入的地方。您可以使用StreamExecutionEnvironment.addSource(sourceFunction)将source添加到程序。Flink附带了许多预实现的source函数,但您始终可以通过为非并行sources实现SourceFunction接口,或为并行sources实现ParallelSourceFunction接口或继承RichParallelSourceFunction类来编写自己的自定义sources。

There are several predefined stream sources accessible from the StreamExecutionEnvironment:
可以从StreamExecutionEnvironment访问几个预定义的sources:

File-based: 基于文件的

  • readTextFile(path) - Reads text files, i.e. files that respect the TextInputFormat specification, line-by-line and returns them as Strings.
    readTextFile(path) - 逐行读取符合TextInputFormat规范的文本文件,并将其作为字符串返回

  • readFile(fileInputFormat, path) - Reads (once) files as dictated by the specified file input format.
    readFile(fileInputFormat,path) - 根据指定的文件输入格式读取(一次)文件

  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - This is the method called internally by the two previous ones. It reads files in the path based on the given fileInputFormat. Depending on the provided watchType, this source may periodically monitor (every interval ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY), or process once the data currently in the path and exit (FileProcessingMode.PROCESS_ONCE). Using the pathFilter, the user can further exclude files from being processed.
    readFile(fileInputFormat、path、watchType、interval、pathFilter、typeInfo) - 这是前两个方法在内部调用的方法。它根据给定的fileInputFormat读取path中的文件。根据提供的watchType,此source可能会定期监视(每间隔ms)指定路径中的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或处理当前路径中的数据一次,然后退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,用户可以进一步排除正在处理的文件。

IMPLEMENTATION 实施:

Under the hood, Flink splits the file reading process into two sub-tasks, namely directory monitoring and data reading. Each of these sub-tasks is implemented by a separate entity. Monitoring is implemented by a single, non-parallel (parallelism = 1) task, while reading is performed by multiple tasks running in parallel. The parallelism of the latter is equal to the job parallelism. The role of the single monitoring task is to scan the directory (periodically or only once depending on the watchType), find the files to be processed, divide them in splits, and assign these splits to the downstream readers. The readers are the ones who will read the actual data. Each split is read by only one reader, while a reader can read multiple splits, one-by-one.
在幕后,Flink将文件读取过程分成两个子任务,即目录监视和数据读取。这些子任务中的每一个都由单独的实体实现。监视由单个非并行(并行度=1)任务实现,而读取由并行运行的多个任务执行。后者的并行度等于作业的并行度。单个监视任务的作用是扫描目录(根据watchType定期或仅扫描一次),查找要处理的文件,将其拆分,并将这些切片分配给下游reader。reader将读取实际数据。每个切片只能由一个reader读取,而reader可以逐个读取多个切片。

IMPORTANT NOTES 重要注意事项:

  1. If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.
    如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,当文件被修改时,其内容将被完全重新处理。这可能会打破“精确一次”的语义,因为在文件末尾附加数据将导致重新处理其所有内容。
  2. If the watchType is set to FileProcessingMode.PROCESS_ONCE, the source scans the path once and exits, without waiting for the readers to finish reading the file contents. Of course the readers will continue reading until all file contents are read. Closing the source leads to no more checkpoints after that point. This may lead to slower recovery after a node failure, as the job will resume reading from the last checkpoint.
    如果watchType设置为FileProcessingMode.PROCESS_ONCE,source扫描路径一次并退出,而不等待reader完成读取文件内容。当然,reader将继续读取,直到所有文件内容被读取。关闭source将导致在该点之后不再有检查点。这可能会导致节点故障后恢复较慢,因为作业将从最后一个检查点恢复读取。

Socket-based: 基于套接字的

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.
    socketTextStream - 从套接字读取。元素可以用分隔符分隔。

Collection-based: 基于集合的

  • fromCollection(Collection) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.
    fromCollection(Collection) - 从Java Java.util.Collection创建数据流。集合中的所有元素必须为同一类型。

  • fromCollection(Iterator, Class) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.
    fromCollection(Iterator, Class) - 从迭代器创建数据流。class指定迭代器返回的元素的数据类型。

  • fromElements(T …) - Creates a data stream from the given sequence of objects. All objects must be of the same type.
    fromElements(T …) - 从给定的对象序列创建数据流。所有对象必须是同一类型。

  • fromParallelCollection(SplittableIterator, Class) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.
    fromParallelCollection(SplittableIterator, Class) - 从迭代器创建并行数据流。class指定迭代器返回的元素的数据类型。

  • generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.
    generateSequence(from, to) - 以给定范围的数字生成并行序列。

Custom: 用户自定义的

  • addSource - Attach a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer<>(…)). See connectors for more details.
    addSource - 附加一个新的source函数。例如,要从Apache Kafka中读取数据,可以使用addSource(new FlinkKafkaConsumer<>(…))。有关详细信息,请参见连接器。

DataStream Transformations DataStream转换

Please see operators for an overview of the available stream transformations.
有关可用流转换的概述,请参见operators。

Data Sinks 数据接收器

Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:
数据sinks消费DataStreams并将其转发到文件、套接字、外部系统或打印它们。Flink提供了各种封装在DataStreams操作后面的内置输出格式:

  • writeAsText() / TextOutputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.
    writeAsText() / TextOutputFormat - 将元素以字符串类型逐行写出。字符串是通过调用每个元素的toString()方法获得的。

  • writeAsCsv(…) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.
    writeAsCsv(…) / CsvOutputFormat - 将元组的值以逗号分隔写入文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。

  • print() / printToErr() - Prints the toString() value of each element on the standard out / standard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.
    print() / printToErr() - 在标准输出/标准错误流中打印每个元素的toString()值。可选地,可以在输出之前提供前缀(msg) 。这有助于区分不同的打印调用。如果并行度大于1,则输出将会以生成输出的任务的编号作为前缀。

  • writeUsingOutputFormat() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.
    writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法和基类。支持自定义对象到字节的转换。

  • writeToSocket - Writes elements to a socket according to a SerializationSchema
    writeToSocket - 根据SerializationSchema将元素写入socket

  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.
    addSink - 调用自定义sink函数。Flink与其他系统(如Apache Kafka)的连接器捆绑在一起,这些连接器被实现为sink函数。

Note that the write*() methods on DataStream are mainly intended for debugging purposes. They are not participating in Flink’s checkpointing, this means these functions usually have at-least-once semantics. The data flushing to the target system depends on the implementation of the OutputFormat. This means that not all elements send to the OutputFormat are immediately showing up in the target system. Also, in failure cases, those records might be lost.
请注意,DataStream上的write*()方法主要用于调试目的。它们不参与Flink的检查点,这意味着这些函数通常有at-least-once语义。刷新到目标系统的数据取决于OutputFormat实现。这意味着并非所有发送到OutputFormat的元素都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。

For reliable, exactly-once delivery of a stream into a file system, use the StreamingFileSink. Also, custom implementations through the .addSink(…) method can participate in Flink’s checkpointing for exactly-once semantics.
要将流可靠、精确一次地传递到文件系统,请使用StreamingFileSink。此外,通过.addSink(…)方法的自定义实现可以参与Flink的精确一次语义检查点。

Iterations 迭代

Iterative streaming programs implement a step function and embed it into an IterativeStream. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a side output or a filter. Here, we show an example using filters. First, we define an IterativeStream
迭代流式程序实现一个step函数并将其嵌入到IterativeStream中。由于DataStream程序可能永远不会完成,因此没有最大迭代次数。另外,您需要指定流的哪一部分被反馈到迭代器,哪一部分使用侧流输出或过滤器被转发到下游。这里,我们展示了一个使用过滤器的示例。首先,我们定义一个IterativeStream

IterativeStream<Integer> iteration = input.iterate();

Then, we specify the logic that will be executed inside the loop using a series of transformations (here a simple map transformation)
然后,我们使用一系列转换(这里是一个简单的map转换)指定将在循环内执行的逻辑

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

To close an iteration and define the iteration tail, call the closeWith(feedbackStream) method of the IterativeStream. The DataStream given to the closeWith function will be fed back to the iteration head. A common pattern is to use a filter to separate the part of the stream that is fed back, and the part of the stream which is propagated forward. These filters can, e.g., define the “termination” logic, where an element is allowed to propagate downstream rather than being fed back.
要关闭迭代并定义迭代末尾部分,请调用IterativeStream的closeWith(feedbackStream)方法。给closeWith函数的DataStream将反馈给迭代头。一种常见的模式是使用过滤器来分离反馈的流部分和向前传播的流部分。例如,这些过滤器可以定义“终止”逻辑,其中允许元素向下游传播而不是反馈回去。

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:
例如,这里有一个程序,从一系列整数中连续减去1,直到它们达到零:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

//定义迭代逻辑
DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
    
  @Override
  public Long map(Long value) throws Exception {
    
    return value - 1 ;
  }
});

//定义反馈回去的条件
DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
    
  @Override
  public boolean filter(Long value) throws Exception {
    
    return (value > 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

//定义向下游传播的条件
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
    
  @Override
  public boolean filter(Long value) throws Exception {
    
    return (value <= 0);
  }
});

Execution Parameters 执行参数

The StreamExecutionEnvironment contains the ExecutionConfig which allows to set job specific configuration values for the runtime.
StreamExecutionEnvironment包含ExecutionConfig,允许为runtime设置作业特定的配置值。

Please refer to execution configuration for an explanation of most parameters. These parameters pertain specifically to the DataStream API:
有关大多数参数的说明,请参阅执行配置。这些参数特别适用于DataStream API:

  • setAutoWatermarkInterval(long milliseconds): Set the interval for automatic watermark emission. You can get the current value with long getAutoWatermarkInterval()
    setAutoWatermarkInterval(long milliseconds):设置自动水印发射的间隔。可以使用getAutoWatermarkInterval()获取当前值

Fault Tolerance 容错

State & Checkpointing describes how to enable and configure Flink’s checkpointing mechanism.
状态和检查点描述了如何启用和配置Flink的检查点机制。

Controlling Latency 控制延迟

By default, elements are not transferred on the network one-by-one (which would cause unnecessary network traffic) but are buffered. The size of the buffers (which are actually transferred between machines) can be set in the Flink config files. While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough. To control throughput and latency, you can use env.setBufferTimeout(timeoutMillis) on the execution environment (or on individual operators) to set a maximum wait time for the buffers to fill up. After this time, the buffers are sent automatically even if they are not full. The default value for this timeout is 100 ms.
默认情况下,元素不会在网络上逐个传输(这会导致不必要的网络流量),而是被缓存起来。缓存区的大小(实际上在机器之间传输的是它们)可以在Flink配置文件中设置。虽然这种方法有利于优化吞吐量,但当传入流不够快时,它可能会导致延迟问题。要控制吞吐量和延迟,可以使用执行环境(或单个operators)上的env.setBufferTimeout(timeoutMillis),以设置缓存区填满的最大等待时间。在此时间之后,即使缓存区未满,也会自动发送缓冲区。此超时的默认值为100毫秒。

Usage用法:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

To maximize throughput, set setBufferTimeout(-1) which will remove the timeout and buffers will only be flushed when they are full. To minimize latency, set the timeout to a value close to 0 (for example 5 or 10 ms). A buffer timeout of 0 should be avoided, because it can cause severe performance degradation.
为了最大化吞吐量,设置setBufferTimeout(-1),这将删除超时,缓存区只有在满时才会刷新。要最小化延迟,请将超时设置为接近0的值(例如5或10毫秒)。应避免缓存区超时为0,因为它可能导致严重的性能下降。

Debugging 调试

Before running a streaming program in a distributed cluster, it is a good idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis programs is usually an incremental process of checking results, debugging, and improving.
在分布式集群中运行流程序之前,最好确保所实现的算法按预期工作。因此,实现数据分析程序通常是一个检查结果、调试和改进的增量过程。

Flink provides features to significantly ease the development process of data analysis programs by supporting local debugging from within an IDE, injection of test data, and collection of result data. This section give some hints how to ease the development of Flink programs.
Flink通过支持IDE中的本地调试、测试数据的注入和结果数据的收集,提供了显著简化数据分析程序开发过程的功能。本节给出了一些如何简化Flink程序开发的提示。

Local Execution Environment 本地执行环境

A LocalStreamEnvironment starts a Flink system within the same JVM process it was created in. If you start the LocalEnvironment from an IDE, you can set breakpoints in your code and easily debug your program.
LocalStreamEnvironment会在其自身被创建的JVM进程内启动一个Flink系统。如果从IDE启动LocalEnvironment,则可以在代码中设置断点并轻松调试程序。

A LocalEnvironment is created and used as follows:
LocalEnvironment的创建和使用如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();

Collection Data Sources 集合数据源

Flink provides special data sources which are backed by Java collections to ease testing. Once a program has been tested, the sources and sinks can be easily replaced by sources and sinks that read from / write to external systems.
Flink提供了由Java集合支持的特殊数据源,以便于测试。一旦程序经过测试,sources和sinks就可以很容易地被从外部系统读/写的sources和sinks替换。

Collection data sources can be used as follows:
集合数据源可按如下方式使用:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...;
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

Note: Currently, the collection data source requires that data types and iterators implement Serializable. Furthermore, collection data sources can not be executed in parallel ( parallelism = 1).
注意:目前,集合数据源要求数据类型和迭代器实现Serializable。此外,集合数据源不能并行执行(并行度=1)。

Iterator Data Sink 迭代数据Sink

Flink also provides a sink to collect DataStream results for testing and debugging purposes. It can be used as follows:
Flink还提供了一个Sink来收集DataStream结果,用于测试和调试。它可以如下使用:

import org.apache.flink.streaming.experimental.DataStreamUtils;

DataStream<Tuple2<String, Integer>> myResult = ...;
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult);

Where to go next? 下一步去哪里?

  • Operators: Specification of available streaming operators.
    Operators: 可用流式operators的规范
  • Event Time: Introduction to Flink’s notion of time.
    事件时间:介绍Flink的时间概念
  • State & Fault Tolerance: Explanation of how to develop stateful applications.
    状态和容错:解释如何开发有状态的应用程序
  • Connectors: Description of available input and output connectors.
    连接器:可用输入和输出连接器的介绍
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_48813624/article/details/126481544

智能推荐

解决win10/win8/8.1 64位操作系统MT65xx preloader线刷驱动无法安装_mt65驱动-程序员宅基地

文章浏览阅读1.3w次。转载自 http://www.miui.com/thread-2003672-1-1.html 当手机在刷错包或者误修改删除系统文件后会出现无法开机或者是移动定制(联通合约机)版想刷标准版,这时就会用到线刷,首先就是安装线刷驱动。 在XP和win7上线刷是比较方便的,用那个驱动自动安装版,直接就可以安装好,完成线刷。不过现在也有好多机友换成了win8/8.1系统,再使用这个_mt65驱动

SonarQube简介及客户端集成_sonar的客户端区别-程序员宅基地

文章浏览阅读1k次。SonarQube是一个代码质量管理平台,可以扫描监测代码并给出质量评价及修改建议,通过插件机制支持25+中开发语言,可以很容易与gradle\maven\jenkins等工具进行集成,是非常流行的代码质量管控平台。通CheckStyle、findbugs等工具定位不同,SonarQube定位于平台,有完善的管理机制及强大的管理页面,并通过插件支持checkstyle及findbugs等既有的流..._sonar的客户端区别

元学习系列(六):神经图灵机详细分析_神经图灵机方法改进-程序员宅基地

文章浏览阅读3.4k次,点赞2次,收藏27次。神经图灵机是LSTM、GRU的改进版本,本质上依然包含一个外部记忆结构、可对记忆进行读写操作,主要针对读写操作进行了改进,或者说提出了一种新的读写操作思路。神经图灵机之所以叫这个名字是因为它通过深度学习模型模拟了图灵机,但是我觉得如果先去介绍图灵机的概念,就会搞得很混乱,所以这里主要从神经图灵机改进了LSTM的哪些方面入手进行讲解,同时,由于模型的结构比较复杂,为了让思路更清晰,这次也会分开几..._神经图灵机方法改进

【机器学习】机器学习模型迭代方法(Python)-程序员宅基地

文章浏览阅读2.8k次。一、模型迭代方法机器学习模型在实际应用的场景,通常要根据新增的数据下进行模型的迭代,常见的模型迭代方法有以下几种:1、全量数据重新训练一个模型,直接合并历史训练数据与新增的数据,模型直接离线学习全量数据,学习得到一个全新的模型。优缺点:这也是实际最为常见的模型迭代方式,通常模型效果也是最好的,但这样模型迭代比较耗时,资源耗费比较多,实时性较差,特别是在大数据场景更为困难;2、模型融合的方法,将旧模..._模型迭代

base64图片打成Zip包上传,以及服务端解压的简单实现_base64可以装换zip吗-程序员宅基地

文章浏览阅读2.3k次。1、前言上传图片一般采用异步上传的方式,但是异步上传带来不好的地方,就如果图片有改变或者删除,图片服务器端就会造成浪费。所以有时候就会和参数同步提交。笔者喜欢base64图片一起上传,但是图片过多时就会出现数据丢失等异常。因为tomcat的post请求默认是2M的长度限制。2、解决办法有两种:① 修改tomcat的servel.xml的配置文件,设置 maxPostSize=..._base64可以装换zip吗

Opencv自然场景文本识别系统(源码&教程)_opencv自然场景实时识别文字-程序员宅基地

文章浏览阅读1k次,点赞17次,收藏22次。Opencv自然场景文本识别系统(源码&教程)_opencv自然场景实时识别文字

随便推点

ESXi 快速复制虚拟机脚本_exsi6.7快速克隆centos-程序员宅基地

文章浏览阅读1.3k次。拷贝虚拟机文件时间比较长,因为虚拟机 flat 文件很大,所以要等。脚本完成后,以复制虚拟机文件夹。将以下脚本内容写入文件。_exsi6.7快速克隆centos

好友推荐—基于关系的java和spark代码实现_本关任务:使用 spark core 知识完成 " 好友推荐 " 的程序。-程序员宅基地

文章浏览阅读2k次。本文主要实现基于二度好友的推荐。数学公式参考于:http://blog.csdn.net/qq_14950717/article/details/52197565测试数据为自己随手画的关系图把图片整理成文本信息如下:a b c d e f yb c a f gc a b dd c a e h q re f h d af e a b gg h f bh e g i di j m n ..._本关任务:使用 spark core 知识完成 " 好友推荐 " 的程序。

南京大学-高级程序设计复习总结_南京大学高级程序设计-程序员宅基地

文章浏览阅读367次。南京大学高级程序设计期末复习总结,c++面向对象编程_南京大学高级程序设计

4.朴素贝叶斯分类器实现-matlab_朴素贝叶斯 matlab训练和测试输出-程序员宅基地

文章浏览阅读3.1k次,点赞2次,收藏12次。实现朴素贝叶斯分类器,并且根据李航《统计机器学习》第四章提供的数据训练与测试,结果与书中一致分别实现了朴素贝叶斯以及带有laplace平滑的朴素贝叶斯%书中例题实现朴素贝叶斯%特征1的取值集合A1=[1;2;3];%特征2的取值集合A2=[4;5;6];%S M LAValues={A1;A2};%Y的取值集合YValue=[-1;1];%数据集和T=[ 1,4,-1;..._朴素贝叶斯 matlab训练和测试输出

Markdown 文本换行_markdowntext 换行-程序员宅基地

文章浏览阅读1.6k次。Markdown 文本换行_markdowntext 换行

错误:0xC0000022 在运行 Microsoft Windows 非核心版本的计算机上,运行”slui.exe 0x2a 0xC0000022″以显示错误文本_错误: 0xc0000022 在运行 microsoft windows 非核心版本的计算机上,运行-程序员宅基地

文章浏览阅读6.7w次,点赞2次,收藏37次。win10 2016长期服务版激活错误解决方法:打开“注册表编辑器”;(Windows + R然后输入Regedit)修改SkipRearm的值为1:(在HKEY_LOCAL_MACHINE–》SOFTWARE–》Microsoft–》Windows NT–》CurrentVersion–》SoftwareProtectionPlatform里面,将SkipRearm的值修改为1)重..._错误: 0xc0000022 在运行 microsoft windows 非核心版本的计算机上,运行“slui.ex