大数据学习1001问 加入小组

210个成员 139个话题 创建时间:2017-10-18

客户端RPC远程调用server,被关闭是什么原因?

发表于2017-12-15 866次查看

2017-12-15 13:36:07,074 INFO  [main] ipc.CallQueueManager (CallQueueManager.java:<init>(53)) - Using callQueue class java.util.concurrent.LinkedBlockingQueue
2017-12-15 13:36:07,509 INFO  [Socket Reader #1 for port 8899] ipc.Server (Server.java:run(606)) - Starting Socket Reader #1 for port 8899
2017-12-15 13:36:07,684 INFO  [IPC Server Responder] ipc.Server (Server.java:run(836)) - IPC Server Responder: starting
2017-12-15 13:36:07,684 INFO  [IPC Server listener on 8899] ipc.Server (Server.java:run(676)) - IPC Server listener on 8899: starting
调用到了方法
2017-12-15 13:38:14,884 INFO  [Socket Reader #1 for port 8899] ipc.Server (Server.java:doRead(780)) - Socket Reader #1 for port 8899: readAndProcess from client 127.0.0.1 threw exception [java.io.IOException: 远程主机强迫关闭了一个现有的连接。]
java.io.IOException: 远程主机强迫关闭了一个现有的连接。
    at sun.nio.ch.SocketDispatcher.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:197)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.hadoop.ipc.Server.channelRead(Server.java:2603)
    at org.apache.hadoop.ipc.Server.access$2800(Server.java:136)
    at org.apache.hadoop.ipc.Server$Connection.readAndProcess(Server.java:1481)
    at org.apache.hadoop.ipc.Server$Listener.doRead(Server.java:771)
    at org.apache.hadoop.ipc.Server$Listener$Reader.doRunLoop(Server.java:637)
    at org.apache.hadoop.ipc.Server$Listener$Reader.run(Server.java:608)
 

5回复
  • 2楼 王老师 2017-12-16

    同学你有源代码吗?我看看源代码是怎么写的。

    • imstarIT 2017-12-18
      package hdfs.rpc.client; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.mapreduce.Job; import hdfs.rpc.server.MyBusiness; public class RpcClient { public static void main(String[] args) throws Exception { // 创建job MyBusiness proxy=RPC.getProxy(MyBusiness.class, MyBusiness.versionID, new InetSocketAddress("localhost", 8899), new Configuration()); System.out.println(proxy.sayHello("tom")); } } package hdfs.rpc.server; import org.apache.hadoop.ipc.VersionedProtocol; public interface MyBusiness extends VersionedProtocol { // 定义版本号 public static long versionID = 1; // 定义任务 public String sayHello(String name); } package hdfs.rpc.server; import java.io.IOException; import org.apache.hadoop.ipc.ProtocolSignature; public class MyBusinessImpl implements MyBusiness { @Override public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException { // TODO Auto-generated method stub return new ProtocolSignature(MyBusiness.versionID, null); } @Override public long getProtocolVersion(String arg0, long arg1) throws IOException { // TODO Auto-generated method stub return MyBusiness.versionID; } @Override public String sayHello(String name) { // TODO Auto-generated method stub return "say" name; } } package hdfs.rpc.server; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; public class RPCServer { public static void main(String[] args) throws Exception { // 创建server RPC.Builder builder = new RPC.Builder(new Configuration()); // 使用协议 builder.setBindAddress("localhost"); builder.setPort(8899); // 发布程序 builder.setProtocol(MyBusiness.class); builder.setInstance(new MyBusinessImpl()); // 创建server Server server = builder.build(); // 启动server server.start(); } }
  • 3楼 imstarIT 2017-12-17

    package hdfs.rpc.client;

    import java.io.IOException;
    import java.net.InetSocketAddress;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;
    import org.apache.hadoop.mapreduce.Job;

    import hdfs.rpc.server.MyBusiness;

    public class RpcClient {

        public static void main(String[] args) throws Exception {
            // 创建job
            MyBusiness proxy=RPC.getProxy(MyBusiness.class,
                    MyBusiness.versionID, new InetSocketAddress("localhost", 8899), 
                    new Configuration());
            
            System.out.println(proxy.sayHello("tom"));
            
            
        }

    }
     

  • 4楼 imstarIT 2017-12-17

    package hdfs.rpc.server;

    import org.apache.hadoop.ipc.VersionedProtocol;

    public interface MyBusiness extends VersionedProtocol {
        // 定义版本号
        public static long versionID = 1;

        // 定义任务
        public String sayHello(String name);
    }
     

  • 5楼 imstarIT 2017-12-17

    package hdfs.rpc.server;

    import java.io.IOException;

    import org.apache.hadoop.ipc.ProtocolSignature;

    public class MyBusinessImpl implements MyBusiness {

        @Override
        public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException {
            // TODO Auto-generated method stub
            return new ProtocolSignature(MyBusiness.versionID, null);
        }

        @Override
        public long getProtocolVersion(String arg0, long arg1) throws IOException {
            // TODO Auto-generated method stub
            return MyBusiness.versionID;
        }

        @Override
        public String sayHello(String name) {
            // TODO Auto-generated method stub
            return "say" + name;
        }

    }
     

  • 6楼 imstarIT 2017-12-17

    package hdfs.rpc.server;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;
    import org.apache.hadoop.ipc.RPC.Server;

    public class RPCServer {

        public static void main(String[] args) throws Exception {
            // 创建server
            RPC.Builder builder = new RPC.Builder(new Configuration());

            // 使用协议
            builder.setBindAddress("localhost");
            builder.setPort(8899);
            // 发布程序
            builder.setProtocol(MyBusiness.class);
            builder.setInstance(new MyBusinessImpl());
            // 创建server
            Server server = builder.build();

            // 启动server
            server.start();
        }

    }
     

发表回复
你还没有登录,请先 登录或 注册!