Вопрос:

Netty Proxy для дублирования трафика

java proxy netty nio

641 просмотра

1 ответ

1106 Репутация автора

Я изучаю способ дублировать трафик с нетто-прокси на два сервера. т.е. вместо общей реализации:

Сервер1 -> Прокси -> Сервер 2

Я хотел бы сделать следующее:

Сервер 1 -> Прокси -> Сервер 2 и Сервер 3

Server3-> Proxy удален

Следовательно, каждое сообщение отправляется как на сервер 2, так и на сервер 3.

У меня просто есть одно ограничение, что связь между прокси и сервером 2 не должна блокироваться из-за сервера 3 (в случае, если сервер 3 работает медленно и т. Д.).

Я начинаю со следующего кода: https://github.com/dawnbreaks/TcpProxy

К сожалению, я не слишком знаком с Netty, но реализация кажется мне оптимальной для моих целей. Я хотел бы понять:

  1. Как создать новый канал для сервера 3
  2. Какой API переопределить для связи с сервером 3
  3. Как читать и отбрасывать сообщения с сервера 3
Автор: tsar2512 Источник Размещён: 07.03.2017 11:28

Ответы (1)


3 плюса

752 Репутация автора

Решение

видел твой чат в IRC #netty.

Несколько вещей здесь. Ваш Прокси должен иметь серверную часть, к которой подключается Сервер 1. Затем Сервер 2 и Сервер 3 должны либо исключить соединение с Прокси, либо вы можете использовать UDP (в зависимости от) для получения данных от Прокси.

У Netty есть пример прокси-сервера. Это будет работать в вашем случае, и это действительно легко для третьей части. Проще говоря, вы использовали бы существующий пример и открыли бы новое соединение, которое было бы к Серверу 3. Теперь, что вы можете сделать, это взять оба канала из Прокси (Соединения Клиента с сервером 2 и 3) Поместите их в группу каналов и запишите одно раз до двух серверов! Мой пример кода, который является редактируемым, разрешает ... обмен данными между сервером 1 и сервером 2 через прокси-сервер и взаимный разговор, в то время как сервер 3 может только принимать данные, но если сервер 3 отвечает на прокси-сервер, прокси-сервер ничего не делает , Возможно, вы захотите добавить обработчик для освобождения буферов или обработки данных, записанных обратно, которые не должны быть с сервера 3. Также отсюда это должно помочь вам начать, но посмотрите документы netty, api, примеры и ppts, они очень полезны!

Я приложу некоторый модифицированный код, чтобы показать вам, и вот ссылка на примеры.

Netty Proxy Server Примеры

Таким образом, для примера вы должны отредактировать HexDumpProxyFrontendHandler.class и просто добавить второй Bootstrap для нового клиента для сервера 3.

Текущий код

41      @Override
42      public void channelActive(ChannelHandlerContext ctx) {
43          final Channel inboundChannel = ctx.channel();
44  
45          // Start the connection attempt.
46          Bootstrap b = new Bootstrap();
47          b.group(inboundChannel.eventLoop())
48           .channel(ctx.channel().getClass())
49           .handler(new HexDumpProxyBackendHandler(inboundChannel))
50           .option(ChannelOption.AUTO_READ, false);
51          ChannelFuture f = b.connect(remoteHost, remotePort);
52          outboundChannel = f.channel();
53          f.addListener(new ChannelFutureListener() {
54              @Override
55              public void operationComplete(ChannelFuture future) {
56                  if (future.isSuccess()) {
57                      // connection complete start to read first data
58                      inboundChannel.read();
59                  } else {
60                      // Close the connection if the connection attempt has failed.
61                      inboundChannel.close();
62                  }
63              }
64          });
65      }

Отредактированный код

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.example.proxy;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {

    private final String remoteHost;
    private final int remotePort;

    // As we use inboundChannel.eventLoop() when buildling the Bootstrap this does not need to be volatile as
    // the server2OutboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel.
    private Channel server2OutboundChannel;
    private Channel server3OutboundChannel;

    // TODO You should change this to your own executor
    private ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        final Channel inboundChannel = ctx.channel();

        // Start the connection attempt to SERVER 3
        Bootstrap server3Bootstrap = new Bootstrap();
        server3Bootstrap.group(inboundChannel.eventLoop())
                .channel(ctx.channel().getClass())
                // You are only writing traffic to server 3 so you do not need to have a handler for the inbound traffic
                .handler(new DiscardServerHandler()) // EDIT
                .option(ChannelOption.AUTO_READ, false);
        ChannelFuture server3Future = server3Bootstrap.connect(remoteHost, remotePort);
        server3OutboundChannel = server3Future.channel();


        // Start the connection attempt to SERVER 2
        Bootstrap server2Bootstrap = new Bootstrap();
        server2Bootstrap.group(inboundChannel.eventLoop())
                .channel(ctx.channel().getClass())
                .handler(new HexDumpProxyBackendHandler(inboundChannel))
                .option(ChannelOption.AUTO_READ, false);
        ChannelFuture server2Future = server2Bootstrap.connect(remoteHost, remotePort);
        server2OutboundChannel = server2Future.channel();
        server2Future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    // connection complete start to read first data
                    inboundChannel.read();
                } else {
                    // Close the connection if the connection attempt has failed.
                    inboundChannel.close();
                }
            }
        });

        // Here we are going to add channels to channel group to save bytebuf work
        channels.add(server2OutboundChannel);
        channels.add(server3OutboundChannel);
    }

    // You can keep this the same below or use the commented out section
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
        // You need to reference count the message +1
        msg.retain();
        if (server2OutboundChannel.isActive()) {
            server2OutboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (future.isSuccess()) {
                        // was able to flush out data, start to read the next chunk
                        ctx.channel().read();
                    } else {
                        future.channel().close();
                    }
                }
            });
        }
        if (server3OutboundChannel.isActive()) {
            server3OutboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (future.isSuccess()) {
                        // was able to flush out data, start to read the next chunk
                        ctx.channel().read();
                    } else {
                        future.channel().close();
                    }
                }
            });
        }


        // Optional to the above code instead channel writing automatically cares for reference counting for you
//        channels.writeAndFlush(msg).addListeners(new ChannelFutureListener() {
//
//            @Override
//            public void operationComplete(ChannelFuture future) throws Exception {
//                if (future.isSuccess()) {
//                    // was able to flush out data, start to read the next chunk
//                    ctx.channel().read();
//                } else {
//                    future.channel().close();
//                }
//            }
//        });
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        if (server2OutboundChannel != null) {
            closeOnFlush(server2OutboundChannel);
        }
        if (server3OutboundChannel != null) {
            closeOnFlush(server3OutboundChannel);
        }


        // Optionally can do this
//        channels.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        closeOnFlush(ctx.channel());
    }

    /**
     * Closes the specified channel after all queued write requests are flushed.
     */
    static void closeOnFlush(Channel ch) {
        if (ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }
}

Обработчик сброса

Это может быть добавлено к серверу 3 в качестве обработчика для отбрасывания всего, что записано в прокси сервером 3. По умолчанию SimpleInboundHandlers будет отбрасывать сообщения после того, как они обработаны посредством уменьшения счетчика ссылок.

Отменить код обработчика

Автор: Mr00Anderson Размещён: 08.03.2017 08:41
Вопросы из категории :
32x32