虚位以待(AD)
虚位以待(AD)
首页 > 数据库 > DB2数据库 > 在Hbase Endpoint Coprocessor中使用coprocessor Proxy操作例子与问题解析

在Hbase Endpoint Coprocessor中使用coprocessor Proxy操作例子与问题解析
类别:DB2数据库   作者:码皇   来源:Michael的专栏     点击:

一、先说注意事项吧:1、Coprocessor启动有三种方式:配置文件、shell和程序中指定,我使用的是程序指定: static { EP_TABLE_DISCRIPTOR = new HTableDescriptor( "epTest ");

一、先说注意事项吧:

1、Coprocessor启动有三种方式:配置文件、shell和程序中指定,我使用的是程序指定:

 

    static {
    EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest");
    HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes());
    family.setInMemory(true);
    family.setMaxVersions(1);
    EP_TABLE_DISCRIPTOR.addFamily(family);
    try {
    EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer");
    }
    catch (IOException ioe) {
    }

上段代码中的addCoprocessor就是指定该表启动coprocessor操作。但前提是必须重启HBase才能把jar包载入进来。

 

2、如果客户端连接后出现如下问题:No matching handler **** for protocol in *** region,说明jar包还没有载入到HBaes中,确保HBase已经重启,另外检查代码中addCoprocessor("ict.wde.test.RowCountServer");的类名“RowCountServer”是否写正确了

二、说下步骤

2.1编写服务端代码:

1)接口类(固定格式)

 

    package ict.wde.test;
    import org.apache.hadoop.hbase.Coprocessor;
    import org.apache.hadoop.hbase.filter.Filter;
    import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
    import java.io.File;
    import java.io.IOException;
    /** * Created by Michael on 2015/6/22. */public interface RowCountProtocol extends Coprocessor, CoprocessorProtocol {
    public long getRowCount() throws IOException;
    public long getRowCount(Filter filter) throws IOException;
    public String getStr() throws IOException;
    //public long getKeyValue() throws IOException;
    }
2)真正起作用的类
    package ict.wde.test;
    import org.apache.hadoop.hbase.CoprocessorEnvironment;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.Filter;
    import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
    import org.apache.hadoop.hbase.ipc.ProtocolSignature;
    import java.io.IOException;
    /** * Created by Michael on 2015/6/27. */public class RowCountServer implements RowCountProtocol {
    @Override public void start(CoprocessorEnvironment env) throws IOException {
    }
    @Override public void stop(CoprocessorEnvironment env) throws IOException {
    }
    @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
    return new ProtocolSignature(3, null);
    }
    @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
    return 3;
    }
    @Override public long getRowCount() throws IOException {
    return this.getRowCount(new FirstKeyOnlyFilter());
    }
    @Override public long getRowCount(Filter filter) throws IOException {
    return this.getRowCount(filter, false);
    }
    @Override public String getStr() throws IOException {
    String name = "Hello Doctor Michael Zhang, again!";
    return name;
    }
    // @Override// public long getKeyValueCount() {
    // return 0;
    // }
    public long getRowCount(Filter filter, boolean countKeyValue) throws IOException {
    Scan scan = new Scan();
    scan.setMaxVersions(1);
    if (filter != null) {
    scan.setFilter(filter);
    }
    return 1;
    }
    }

上述两个类打包jar后放入hbase的lib目录下

 

2.2客户端代码

 

    import ict.wde.test.RowCountProtocol;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.HTableInterface;
    import org.apache.hadoop.hbase.filter.Filter;
    import java.io.IOException;
    /** * Created by Michael on 2015/6/30. */public class EndpointTestClient {
    private final HTableInterface table;
    private final Configuration conf;
    private final RowCountProtocol server;
    private static final HTableDescriptor EP_TABLE_DISCRIPTOR;
    static {
    EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest");
    HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes());
    family.setInMemory(true);
    family.setMaxVersions(1);
    EP_TABLE_DISCRIPTOR.addFamily(family);
    try {
    EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer");
    }
    catch (IOException ioe) {
    }
    }
    public EndpointTestClient(Configuration config) throws IOException {
    conf = config;
    table = initTidTable();
    server = table.coprocessorProxy(RowCountProtocol.class, "0".getBytes());
    }
    private HTableInterface initTidTable() throws IOException {
    HBaseAdmin admin = new HBaseAdmin(conf);
    if (!admin.tableExists("epTest")) {
    admin.createTable(EP_TABLE_DISCRIPTOR);
    }
    admin.close();
    return new HTable(conf, "epTest");
    }
    public String getStr() throws IOException {
    return server.getStr();
    }
    }
启动类:
    import org.apache.hadoop.conf.Configuration;
    import java.io.IOException;
    /** * Created by Michael on 2015/6/22. */public class EndpointExample {
    // private final HTableInterface table;
    // private static final Configuration conf;
    // private static final HTableDescriptor EP_TABLE_DISCRIPTOR;
    //// static {
    // conf = new Configuration();
    // conf.set("hbase.zookeeper.quorum", "ccf04:2181");
    //// EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest");
    // HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes());
    // family.setInMemory(true);
    // family.setMaxVersions(1);
    // EP_TABLE_DISCRIPTOR.addFamily(family);
    // try {
    // EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer");
    // }
    catch (IOException ioe) {
    //// }
    //// table = initTidTable();
    // }
    //// private HTableInterface initTidTable() throws IOException {
    // HBaseAdmin admin = new HBaseAdmin(conf);
    // if (!admin.tableExists("epTest")) {
    // admin.createTable(EP_TABLE_DISCRIPTOR);
    // }
    // admin.close();
    // return new HTable(conf, "epTest");
    // }
    public static void main(String[] agrs) throws IOException {
    Configuration conf = new Configuration();
    conf.set("hbase.zookeeper.quorum", "ccf04:2181");
    EndpointTestClient client = new EndpointTestClient(conf);
    String name = client.getStr();
    System.out.println(name);
    }
    }

相关热词搜索: 例子 问题