  • 异步图片处理服务器



    package hello;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import reactor.event.Event;
    import reactor.function.Function;

    import javax.imageio.ImageIO;
    import java.awt.*;
    import java.awt.geom.AffineTransform;
    import java.awt.image.BufferedImage;
    import java.awt.image.ImageObserver;
    import java.nio.file.Files;
    import java.nio.file.Path;

    * Uses the built-in JDK tooling for resizing an image.
    * @author Jon Brisbin
    class BufferedImageThumbnailer implements Function<Event<Path>, Path> {

    private static final ImageObserver DUMMY_OBSERVER = (img, infoflags, x, y, width, height) -> true;

    private final Logger log = LoggerFactory.getLogger(getClass());

    private final int maxLongSide;

    public BufferedImageThumbnailer(int maxLongSide) {
    this.maxLongSide = maxLongSide;

    public Path apply(Event<Path> ev) {
    try {
    Path srcPath = ev.getData();
    Path thumbnailPath = Files.createTempFile("thumbnail", ".jpg").toAbsolutePath();
    BufferedImage imgIn = ImageIO.read(srcPath.toFile());

    double scale;
    if (imgIn.getWidth() >= imgIn.getHeight()) {
    // horizontal or square image
    scale = Math.min(maxLongSide, imgIn.getWidth()) / (double) imgIn.getWidth();
    } else {
    // vertical image
    scale = Math.min(maxLongSide, imgIn.getHeight()) / (double) imgIn.getHeight();

    BufferedImage thumbnailOut = new BufferedImage((int) (scale * imgIn.getWidth()),
    (int) (scale * imgIn.getHeight()),
    Graphics2D g = thumbnailOut.createGraphics();

    AffineTransform transform = AffineTransform.getScaleInstance(scale, scale);
    g.drawImage(imgIn, transform, DUMMY_OBSERVER);
    ImageIO.write(thumbnailOut, "jpeg", thumbnailPath.toFile());

    log.info("Image thumbnail now at: {}", thumbnailPath);

    return thumbnailPath;
    } catch (Exception e) {
    throw new IllegalStateException(e.getMessage(), e);



    package hello;

    import io.netty.buffer.ByteBuf;
    import io.netty.handler.codec.http.*;
    import reactor.core.Reactor;
    import reactor.event.Event;
    import reactor.function.Consumer;
    import reactor.net.NetChannel;

    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.util.concurrent.atomic.AtomicReference;

    import static io.netty.handler.codec.http.HttpHeaders.Names.*;
    import static io.netty.handler.codec.http.HttpResponseStatus.*;
    import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

    * A helper class that contains the necessary Consumers for handling HTTP requests.
    public class ImageThumbnailerRestApi {

    public static final String IMG_THUMBNAIL_URI = "/image/thumbnail.jpg";
    public static final String THUMBNAIL_REQ_URI = "/thumbnail";

    * Accept an image upload via POST and notify a Reactor that the image needs to be thumbnailed. Asynchronously respond
    * to the client when the thumbnailing has completed.
    * @param channel
    * the channel on which to send an HTTP response
    * @param thumbnail
    * a reference to the shared thumbnail path
    * @param reactor
    * the Reactor on which to publish events
    * @return a consumer to handle HTTP requests
    public static Consumer<FullHttpRequest> thumbnailImage(NetChannel<FullHttpRequest, FullHttpResponse> channel,
    AtomicReference<Path> thumbnail,
    Reactor reactor) {
    return req -> {
    if (req.getMethod() != HttpMethod.POST) {
    channel.send(badRequest(req.getMethod() + " not supported for this URI"));

    // write to a temp file
    Path imgIn = null;
    try {
    imgIn = readUpload(req.content());
    } catch (IOException e) {
    throw new IllegalStateException(e.getMessage(), e);

    // Asynchronously thumbnail the image to 250px on the long side
    reactor.sendAndReceive("thumbnail", Event.wrap(imgIn), ev -> {

    * Respond to GET requests and serve the thumbnailed image, a reference to which is kept in the given {@literal
    * AtomicReference}.
    * @param channel
    * the channel on which to send an HTTP response
    * @param thumbnail
    * a reference to the shared thumbnail path
    * @return a consumer to handle HTTP requests
    public static Consumer<FullHttpRequest> serveThumbnailImage(NetChannel<FullHttpRequest, FullHttpResponse> channel,
    AtomicReference<Path> thumbnail) {
    return req -> {
    if (req.getMethod() != HttpMethod.GET) {
    channel.send(badRequest(req.getMethod() + " not supported for this URI"));
    } else {
    try {
    } catch (IOException e) {
    throw new IllegalStateException(e.getMessage(), e);

    * Respond to errors occurring on a Reactor by redirecting them to the client via an HTTP 500 error response.
    * @param channel
    * the channel on which to send an HTTP response
    * @return a consumer to handle HTTP requests
    public static Consumer<Throwable> errorHandler(NetChannel<FullHttpRequest, FullHttpResponse> channel) {
    return ev -> {
    DefaultFullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
    resp.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
    resp.headers().set(HttpHeaders.Names.CONTENT_LENGTH, resp.content().readableBytes());

    ////////////////////////// HELPER METHODS //////////////////////////
    * Read POST uploads and write them to a temp file, returning the Path to that file.
    private static Path readUpload(ByteBuf content) throws IOException {
    byte[] bytes = new byte[content.readableBytes()];

    // write to a temp file
    Path imgIn = Files.createTempFile("upload", ".jpg");
    Files.write(imgIn, bytes);


    return imgIn;

    * Create an HTTP 400 bad request response.
    public static FullHttpResponse badRequest(String msg) {
    DefaultFullHttpResponse resp = new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST);
    resp.headers().set(CONTENT_TYPE, "text/plain");
    resp.headers().set(CONTENT_LENGTH, resp.content().readableBytes());
    return resp;

    * Create an HTTP 301 redirect response.
    public static FullHttpResponse redirect() {
    DefaultFullHttpResponse resp = new DefaultFullHttpResponse(HTTP_1_1, MOVED_PERMANENTLY);
    resp.headers().set(CONTENT_LENGTH, 0);
    resp.headers().set(LOCATION, IMG_THUMBNAIL_URI);
    return resp;

    * Create an HTTP 200 response that contains the data of the thumbnailed image.
    public static FullHttpResponse serveImage(Path path) throws IOException {
    DefaultFullHttpResponse resp = new DefaultFullHttpResponse(HTTP_1_1, OK);

    RandomAccessFile f = new RandomAccessFile(path.toString(), "r");
    resp.headers().set(CONTENT_TYPE, "image/jpeg");
    resp.headers().set(CONTENT_LENGTH, f.length());

    byte[] bytes = Files.readAllBytes(path);

    return resp;



    package hello;

    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    import reactor.core.Environment;
    import reactor.core.Reactor;
    import reactor.core.composable.Stream;
    import reactor.core.spec.Reactors;
    import reactor.net.NetServer;
    import reactor.net.config.ServerSocketOptions;
    import reactor.net.netty.NettyServerSocketOptions;
    import reactor.net.netty.tcp.NettyTcpServer;
    import reactor.net.tcp.spec.TcpServerSpec;
    import reactor.spring.context.config.EnableReactor;

    import java.nio.file.Path;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.atomic.AtomicReference;

    import static reactor.event.selector.Selectors.$;

    * Simple Spring Boot app to start a Reactor+Netty-based REST API server for thumbnailing uploaded images.
    public class ImageThumbnailerApp {

    public Reactor reactor(Environment env) {
    Reactor reactor = Reactors.reactor(env, Environment.THREAD_POOL);

    // Register our thumbnailer on the Reactor
    reactor.receive($("thumbnail"), new BufferedImageThumbnailer(250));

    return reactor;

    public ServerSocketOptions serverSocketOptions() {
    return new NettyServerSocketOptions()
    .pipelineConfigurer(pipeline -> pipeline.addLast(new HttpServerCodec())
    .addLast(new HttpObjectAggregator(16 * 1024 * 1024)));

    public NetServer<FullHttpRequest, FullHttpResponse> restApi(Environment env,
    ServerSocketOptions opts,
    Reactor reactor,
    CountDownLatch closeLatch) throws InterruptedException {
    AtomicReference<Path> thumbnail = new AtomicReference<>();

    NetServer<FullHttpRequest, FullHttpResponse> server = new TcpServerSpec<FullHttpRequest, FullHttpResponse>(
    .consume(ch -> {
    // filter requests by URI via the input Stream
    Stream<FullHttpRequest> in = ch.in();

    // serve image thumbnail to browser
    in.filter((FullHttpRequest req) -> ImageThumbnailerRestApi.IMG_THUMBNAIL_URI.equals(req.getUri()))
    .when(Throwable.class, ImageThumbnailerRestApi.errorHandler(ch))
    .consume(ImageThumbnailerRestApi.serveThumbnailImage(ch, thumbnail));

    // take uploaded data and thumbnail it
    in.filter((FullHttpRequest req) -> ImageThumbnailerRestApi.THUMBNAIL_REQ_URI.equals(req.getUri()))
    .when(Throwable.class, ImageThumbnailerRestApi.errorHandler(ch))
    .consume(ImageThumbnailerRestApi.thumbnailImage(ch, thumbnail, reactor));

    // shutdown this demo app
    in.filter((FullHttpRequest req) -> "/shutdown".equals(req.getUri()))
    .consume(req -> closeLatch.countDown());


    return server;

    public CountDownLatch closeLatch() {
    return new CountDownLatch(1);

    public static void main(String... args) throws InterruptedException {
    ApplicationContext ctx = SpringApplication.run(ImageThumbnailerApp.class, args);

    // Reactor's TCP servers are non-blocking so we have to do something to keep from exiting the main thread
    CountDownLatch closeLatch = ctx.getBean(CountDownLatch.class);


