Spring WebFlux: Flux<DataBuffer> を InputStream へ変換する

備忘録。Spring WebFlux を使っている中で、Flux<DataBuffer> ((実際には FilePart.contents() から取り出した。))を単一の java.io.InputStream へ変換したいシチュエーションに出くわしたので、その方法についてのメモ。

以下のコードで変換した。

...
import org.springframework.core.io.buffer.DataBuffer;
import reactor.core.publisher.Mono;

import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Collections;
import java.util.List;
...
    private Mono<InputStream> convertFluxDataBufferAsInputStream(Flux<DataBuffer> contents) {
        return content.map(DataBuffer::asInputStream) // -> Flux<InputStream>
            .collectList() // -> Mono<List<InputStream>>
            .map((List<Input> list) -> new SequenceInputStream(Collections.enumeration(list));
    }
  • 個々の DataBufferasInputStream()InputStream へ変換
  • 複数の InputStreamSequenceInputStream で1つに結合

ちなみに Flux.reduce() で2つずつ SequenceInputStream に纏め上げることもできるが、内部では連結リストのような構造になる。サイズ次第では SequenceInputStream.readAllBytes()*1 などでスタックオーバーフローが発生する。

*1:それとは別にブロッキングへの考慮も必要。