配置
— 焉知非鱼Configuration
配置
默认情况下,Table & SQL API 是预先配置的,可以在可接受的性能下产生准确的结果。
根据表程序的要求,可能需要调整某些参数进行优化。例如,无约束的流程序可能需要确保所需的状态大小是有上限的(参见流概念)。
概述 #
在每个表环境中,TableConfig 都提供了配置当前会话的选项。
对于常见或重要的配置选项,TableConfig 提供了 getter 和 setter 方法,并提供了详细的内联文档。
对于更高级的配置,用户可以直接访问底层的键值映射。下面的章节列出了所有可用的选项,可以用来调整 Flink Table & SQL API 程序。
注意: 由于在执行操作时,选项会在不同的时间点被读取,因此建议在实例化表环境后尽早设置配置选项。
// instantiate table environment
val tEnv: TableEnvironment = ...
// access flink configuration
val configuration = tEnv.getConfig().getConfiguration()
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true")
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
configuration.setString("table.exec.mini-batch.size", "5000")
注意:目前,键值选项只支持 Blink 计划器。目前,键值选项只支持 Blink 计划器。
执行选项 #
以下选项可以用来调整查询执行的性能。
Key | Default | Type | Description |
---|---|---|---|
table.exec.async-lookup.buffer-capacity(Batch/Streaming) | 100 | Integer | 异步查找连接可以触发的最大异步 i/o 操作数。 |
table.exec.async-lookup.timeout(Batch/Streaming) | “3 min” | String | 异步操作完成的异步超时时间。 |
table.exec.disabled-operators(Batch) | (none) | String | 主要用于测试。一个以逗号分隔的操作符名称列表,每个名称代表一种被禁用的操作符。可以禁用的操作符包括 “NestedLoopJoin”、“ShuffleHashJoin”、“BroadcastHashJoin”、“SortMergeJoin”、“HashAgg”、“SortAgg”。默认情况下,没有任何操作符被禁用。 |
table.exec.mini-batch.allow-latency(Streaming) | “-1 ms” | String | 最大延迟可以用于 MiniBatch 来缓冲输入记录。MiniBatch 是一种优化,用于缓冲输入记录以减少状态访问。MiniBatch 会在允许的延迟间隔和达到最大缓冲记录数时触发。注意:如果 table.exec.mini-batch.enabled 被设置为 true,其值必须大于零。 |
table.exec.mini-batch.enabled(Streaming) | false | Boolean | 指定是否启用 MiniBatch 优化。MiniBatch 是对输入记录进行缓冲以减少状态访问的优化。默认情况下,这个配置是禁用的。要启用这个功能,用户应该将这个配置设置为 true。注意:如果启用了 Mini-batch,必须设置’table.exec.mini-batch.allow-latency’和’table.exec.mini-batch.size'。 |
table.exec.mini-batch.size(Streaming) | -1 | Long | MiniBatch 可以缓冲的输入记录的最大数量。MiniBatch 是对输入记录进行缓冲的优化,以减少状态访问。MiniBatch 会在允许的延迟间隔和达到最大缓冲记录数时触发。注意:MiniBatch 目前只适用于非窗口聚合。如果 table.exec.mini-batch.enabled 被设置为 true,其值必须为正。 |
table.exec.resource.default-parallelism(Batch/Streaming) | -1 | Integer | 为所有操作符(如 aggregation、join、filter)设置默认的并行性,以便与并行实例一起运行。这个配置的优先级高于 StreamExecutionEnvironment 的并行性(实际上,这个配置覆盖了 StreamExecutionEnvironment 的并行性)。值为-1 表示没有设置默认的并行性,那么它将回落到使用 StreamExecutionEnvironment 的并行性。 |
table.exec.shuffle-mode(Batch) | “ALL_EDGES_BLOCKING” | String | 设置执行 shuffle 的模式。接受的值是, ALL_EDGES_BLOCKING: 所有边缘都将使用阻塞洗牌。FORWARD_EDGES_PIPELINED: 正向边缘将使用流水线洗牌,其他边缘将使用阻塞洗牌。POINTWISE_EDGES_PIPELINED: POINTWISE_EDGES_PIPELINED: 点向边缘将使用管道式洗牌,其他边缘将被阻挡。POINTWISE_EDGES_PIPELINED: 点向边缘包括前向和重新缩放边缘。ALL_EDGES_PIPELINED: 所有的边缘都将使用 pipelined shuffle,其他的边缘则使用 blocks。所有边缘都将使用流水线洗牌。batch: 与 ALL_EDGES_BLOCKING 相同。已废弃。pipelined: 与 ALL_EDGES_PIPELINED 相同。已被弃用。注意:Blocking shuffle 意味着数据将在发送到消费者任务之前被完全生成。Pipelined shuffle 意味着数据一旦被生产出来,就会被发送到消费者任务中。 |
table.exec.sink.not-null-enforcer(Batch/Streaming) | ERROR | Enum,可能的值: [ERROR, DROP] | 表上的 NOT NULL 列约束强制要求不能将空值插入到表中。Flink 支持 “错误”(默认)和 “放弃 “执行行为。默认情况下,当 NOT NULL 列中写入空值时,Flink 会检查值并抛出运行时异常。用户可以将行为改为’drop',在不出现异常的情况下默默地删除这些记录。 |
table.exec.sort.async-merge-enabled(Batch) | true | Boolean | 是否异步合并排序后的 spill 文件。 |
table.exec.sort.default-limit(Batch) | -1 | Integer | 当用户在下单后没有设置限价时,默认限价。-1 表示该配置被忽略。 |
table.exec.sort.max-num-file-handles(Batch) | 128 | Integer | 外部合并排序的最大扇入量。它限制了每个操作者的文件句柄数。如果太小,可能会造成中间合并。但如果太大,会造成同时打开的文件太多,消耗内存,导致随机读取。 |
table.exec.source.idle-timeout(Streaming) | “-1 ms” | String | 当一个源在超时时间内没有收到任何元素时,它将被标记为暂时空闲。这样下游任务就可以提前打水印,而不需要在这个源空闲时等待它的水印。 |
table.exec.spill-compression.block-size(Batch) | “64 kb” | String | 溢出数据时做压缩时使用的内存大小。内存越大,压缩比越高,但作业会消耗更多的内存资源。 |
table.exec.spill-compression.enabled(Batch) | true | Boolean | 是否压缩溢出数据。目前我们只支持压缩溢出数据的排序和哈希-agg 和哈希-join 操作符。 |
table.exec.window-agg.buffer-size-limit(Batch) | 100000 | Integer | 设置组窗口 agg 运算符中使用的窗口元素缓冲区大小限制。 |
优化选项 #
以下选项可以用来调整查询优化器的行为,以获得更好的执行计划。
Key | Default | Type | Description |
---|---|---|---|
table.optimizer.agg-phase-strategy(Batch/Streaming) | “AUTO” | String | 聚合阶段的策略。只能设置 AUTO、TWO_PHASE 或 ONE_PHASE。AUTO:集合阶段无特殊执行器。选择两阶段聚合还是一阶段聚合取决于成本。TWO_PHASE: 强制使用两级聚合,其中包括 localAggregate 和 globalAggregate。请注意,如果聚合调用不支持优化为两阶段,我们仍将使用一个阶段的聚合。ONE_PHASE: 强制使用只有 CompleteGlobalAggregate 的单阶段聚合。 |
table.optimizer.distinct-agg.split.bucket-num(Streaming) | 1024 | Integer | 配置拆分不同聚合时的桶数。这个数字在一级聚合中用于计算一个桶键’hash_code(distinct_key) % BUCKET_NUM',这个桶键在拆分后作为一个额外的组键使用。 |
table.optimizer.distinct-agg.split.enabled(Streaming) | false | Boolean | 指示优化器是否将 distinct aggregation(例如 COUNT(DISTINCT col),SUM(DISTINCT col))分成两级。第一层聚合由一个额外的 key 进行洗牌,这个 key 是用 distinct_key 和 buckets 数量的 hashcode 计算出来的。当 distinct aggregation 中存在数据倾斜时,这种优化是非常有用的,并提供了扩展作业的能力。默认为 false。 |
table.optimizer.join-reorder-enabled(Batch/Streaming) | false | Boolean | 在优化器中启用连接重排序。默认为禁用。 |
table.optimizer.join.broadcast-threshold(Batch) | 1048576 | Long | 配置在执行连接时将向所有工作节点广播的表的最大字节数。将此值设置为-1,则禁用广播。 |
table.optimizer.reuse-source-enabled(Batch/Streaming) | true | Boolean | 当它为真时,优化器将尝试找出重复的表源并重用它们。这只有在 table.optimizer.reuse-sub-plan-enabled 为真时才会生效。 |
table.optimizer.reuse-sub-plan-enabled(Batch/Streaming) | true | Boolean | 当它为真时,优化器将尝试找出重复的子计划并重用它们。 |
table.optimizer.source.predicate-pushdown-enabled(Batch/Streaming) | true | Boolean | 当该值为真时,优化器将向下推送谓词到 FilterableTableSource 中。默认值为 true。 |
Table 选项 #
以下选项可用于调整表计划器(planner)的行为:
Key | Default | Type | Description |
---|---|---|---|
table.dynamic-table-options.enabled(Batch/Streaming) | false | Boolean | 启用或禁用 OPTIONS 提示,用于动态指定表选项,如果禁用,则如果指定了任何 OPTIONS 提示,就会产生异常。 |
table.sql-dialect(Batch/Streaming) | “default” | String | SQL 方言定义了如何解析一个 SQL 查询。不同的 SQL 方言可能支持不同的 SQL 语法。目前支持的方言有:默认和 hive。 |
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html