在使用 Async Hbase 时频繁遇到 too many open file 异常,程序自动重启后会立即报错,具体报错日志如下:
- flink版本:1.16.0
- asynchbase版本:1.8.2
2023-03-08 16:15:39
org.jboss.netty.channel.ChannelException: Failed to create a selector.
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52)
at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45)
at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
at org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
at org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
at org.hbase.async.HBaseClient.<init>(HBaseClient.java:507)
at org.hbase.async.HBaseClient.<init>(HBaseClient.java:496)
at com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
at java.nio.channels.Selector.open(Selector.java:227)
at org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
... 25 more
对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下
java.io.IOException: Could not perform checkpoint 5 for operator async wait operator (2/9)#0.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 5 for operator async wait operator (2/9)#0. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
... 22 more
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:308)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75)
at org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:65)
at org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:79)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:230)
... 33 more
对文件描述符分析后发现,发现绝大部分文件描述符如下(占比:15.4k/15.9k),怀疑是程序重启后之前的文件描述符没有释放
lsof -U
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 168258 yarn *648u a_inode 0,10 0 8670 [eventpoll]
java 168258 yarn *652r FIFO 0,9 0t0 850723636 pipe
java 168258 yarn *653w FIFO 0,9 0t0 850723636 pipe
java 168258 yarn *654u a_inode 0,10 0 8670 [eventpoll]
java 168258 yarn *655r FIFO 0,9 0t0 850723637 pipe
java 168258 yarn *656w FIFO 0,9 0t0 850723637 pipe
java 168258 yarn *657u a_inode 0,10 0 8670 [eventpoll]
java 168258 yarn *658r FIFO 0,9 0t0 850723638 pipe
java 168258 yarn *659w FIFO 0,9 0t0 850723638 pipe
java 168258 yarn *660u a_inode 0,10 0 8670 [eventpoll]
java 168258 yarn *661w FIFO 0,9 0t0 850723639 pipe
pom
<dependency>
<groupId>org.hbase</groupId>
<artifactId>asynchbase</artifactId>
<version>1.8.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
关键代码如下:
SingleOutputStreamOperator asyncFunc = AsyncDataStream
.orderedWaitWithRetry(
source,
new HbaseDimensionAsyncFunc(),
60,
TimeUnit.SECONDS,
300,
AsyFixedRetry()
)
.setParallelism(9)
.uid("asyncFunc");
public class HbaseDimensionAsyncFunc extends RichAsyncFunction<Tuple2<String, ArrayList<HashMap<String, String>>>, ArrayList<HashMap<String, String>>> {
private HBaseClient client = null;
// open() 方法在 RichAsyncFunction 生命周期中只会调用一次,用于初始化 Hbase 客户端
@Override
public void open(Configuration configuration) throws Exception {
super.open(configuration);
client = new HBaseClient(PropUtils.getValue("bigdata.hosts"));
}
// asyncInvoke() 方法是异步调用的核心逻辑,用于执行 Hbase 查询操作
@Override
public void asyncInvoke(Tuple2<String, ArrayList<HashMap<String, String>>> o, ResultFuture<ArrayList<HashMap<String, String>>> resultFuture) throws Exception {
String uuid = o.f0;
List<GetRequest> requests = generateRequests(uuid);
// 使用异步 Hbase 客户端库查询 Hbase
CompletableFuture<List<KeyValue>> future = client.get(requests);
// 处理异步查询结果
future.thenApplyAsync(keyValues -> {
// 业务代码,对查询结果进行处理
ArrayList<HashMap<String, String>> result = processResult(keyValues);
return result;
}).thenAcceptAsync(result -> {
// 将处理结果发送给 Flink 算子
resultFuture.complete(Collections.singletonList(result));
});
}
}
通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。
在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。
关闭资源时还需要注意一下事项:
- 数据刷盘:在关闭连接时,需要将缓冲区中的数据刷盘,即将缓冲区中的数据写入磁盘。这可以确保数据被正确地保存到文件系统中,而不会丢失或损坏。
- 资源关闭:在释放资源之前,需要确保已经完成了所有的操作,并将资源关闭。需要在调用
close()
方法之前,确保所有的数据都已经被写入磁盘,然后再关闭。 - Future 取消:在使用 Java 中的
Future
接口时,如果任务还没有完成,就需要取消它。在取消Future
时,需要使用cancel()
方法,并传递一个布尔值来指示是否需要中断任务的执行。这可以确保任务被正确地停止,并释放相关资源。