exchangis中添加不同数据源使用datax进行同步

新建数据源类

首先需要新建数据源类,用来将传入的datax配置转化为datax的json格式,下面以clickhouse为例
exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/builder/transform/handlers中新建ClickhouseDataxSubExchangisJobHandler.java文件

package com.webank.wedatasphere.exchangis.job.server.builder.transform.handlers;  
  
import com.webank.wedatasphere.exchangis.datasource.core.utils.Json;  
import com.webank.wedatasphere.exchangis.job.builder.ExchangisJobBuilderContext;  
import com.webank.wedatasphere.exchangis.job.domain.SubExchangisJob;  
import com.webank.wedatasphere.exchangis.job.domain.params.JobParamDefine;  
import com.webank.wedatasphere.exchangis.job.domain.params.JobParamSet;  
import com.webank.wedatasphere.exchangis.job.domain.params.JobParams;  
import com.webank.wedatasphere.exchangis.job.server.builder.JobParamConstraints;  
import com.webank.wedatasphere.exchangis.job.server.utils.SQLCommandUtils;  
import org.apache.linkis.common.exception.ErrorException;  
  
import java.util.*;  
import java.util.stream.Collectors;  
  
/**  
 * Oracle in datax */public class ClickhouseDataxSubExchangisJobHandler extends AuthEnabledSubExchangisJobHandler {  
  
    /**  
     * Host     */    private static final JobParamDefine<String> SOURCE_HOST = JobParams.define("connection[0].jdbcUrl[0].host", JobParamConstraints.HOST);  
    private static final JobParamDefine<String> SINK_HOST = JobParams.define("connection[0].jdbcUrl.host", JobParamConstraints.HOST);  
  
    /**  
     * Port     */    private static final JobParamDefine<String> SOURCE_PORT = JobParams.define("connection[0].jdbcUrl[0].port", JobParamConstraints.PORT);  
    private static final JobParamDefine<String> SINK_PORT = JobParams.define("connection[0].jdbcUrl.port", JobParamConstraints.PORT);  
  
    /**  
     * ServiceName     */    private static final JobParamDefine<String> SOURCE_DATABASE_NAME = JobParams.define("connection[0].jdbcUrl[0].database", JobParamConstraints.DATABASE);  
    private static final JobParamDefine<String> SINK_DATABASE_NAME = JobParams.define("connection[0].jdbcUrl.database", JobParamConstraints.DATABASE);  
  
    /**  
     * Table     */    private static final JobParamDefine<String> SOURCE_TABLE = JobParams.define("table", JobParamConstraints.TABLE);  
    private static final JobParamDefine<String> SINK_TABLE = JobParams.define("connection[0].table[0]", JobParamConstraints.TABLE);  
  
    /**  
     * Connect params     */    private static final JobParamDefine<Map<String, String>> SOURCE_PARAMS_MAP = JobParams.define("connection[0].jdbcUrl[0].connParams", JobParamConstraints.CONNECT_PARAMS,  
            connectParams -> Json.fromJson(connectParams, Map.class), String.class);  
    private static final JobParamDefine<Map<String, String>> SINK_PARAMS_MAP = JobParams.define("connection[0].jdbcUrl.connParams", JobParamConstraints.CONNECT_PARAMS,  
            connectParams -> Json.fromJson(connectParams, Map.class), String.class);  
  
    /**  
     * Where condition     */    private static final JobParamDefine<String> SOURCE_WHERE_CONDITION = JobParams.define(JobParamConstraints.WHERE);  
  
  
    /**  
     * Query sql     */    private static final JobParamDefine<String> QUERY_SQL = JobParams.define("connection[0].querySql[0]", job -> {  
        JobParamSet sourceParams = job.getRealmParams(SubExchangisJob.REALM_JOB_CONTENT_SOURCE);  
        String where = SOURCE_WHERE_CONDITION.getValue(sourceParams);  
        List<String> columns = job.getSourceColumnsgetName).collect(Collectors.toList();  
        if (columns.isEmpty()) {  
            columns.add("*");  
        }  
        return SQLCommandUtils.contactSql(Collections.singletonList(sourceParams  
                .get(JobParamConstraints.TABLE).getValue()), null, columns, null, where);  
    }, SubExchangisJob.class);  
  
    /**  
     * SQL column     */    private static final JobParamDefine<List<String>> SQL_COLUMN = JobParams.define("column", job -> {  
        List<String> columns = job.getSinkColumnsgetName).collect(Collectors.toList();  
        if (columns.isEmpty()) {  
            columns.add("*");  
        }  
        return columns;  
    }, SubExchangisJob.class);  
  
    @Override  
    public void handleJobSource(SubExchangisJob subExchangisJob, ExchangisJobBuilderContext ctx) throws ErrorException {  
        JobParamSet paramSet = subExchangisJob.getRealmParams(SubExchangisJob.REALM_JOB_CONTENT_SOURCE);  
        if (Objects.nonNull(paramSet)) {  
            Arrays.asList(sourceMappings()).forEach(define -> paramSet.addNonNull(define.get(paramSet)));  
            paramSet.add(QUERY_SQL.newParam(subExchangisJob));  
        }  
    }  
  
    @Override  
    public void handleJobSink(SubExchangisJob subExchangisJob, ExchangisJobBuilderContext ctx) throws ErrorException {  
        JobParamSet paramSet = subExchangisJob.getRealmParams(SubExchangisJob.REALM_JOB_CONTENT_SINK);  
        if (Objects.nonNull(paramSet)) {  
            Arrays.asList(sinkMappings()).forEach(define -> paramSet.addNonNull(define.get(paramSet)));  
            paramSet.add(SQL_COLUMN.newParam(subExchangisJob));  
        }  
    }  
  
    @Override  
    public String dataSourceType() {  
        return "clickhouse";  
    }  
  
    @Override  
    public boolean acceptEngine(String engineType) {  
        return "datax".equalsIgnoreCase(engineType);  
    }  
  
    private JobParamDefine<?>[] sourceMappings() {  
        return new JobParamDefine[]{USERNAME, PASSWORD, SOURCE_TABLE, SOURCE_WHERE_CONDITION,  
                SOURCE_HOST, SOURCE_PORT, SOURCE_DATABASE_NAME, SOURCE_PARAMS_MAP};  
    }  
  
    public JobParamDefine<?>[] sinkMappings() {  
        return new JobParamDefine[]{USERNAME, PASSWORD, SINK_TABLE,  
                SINK_HOST, SINK_PORT, SINK_DATABASE_NAME, SINK_PARAMS_MAP};  
    }  
}

其他数据库的配置也是类似,主要是改变

@Override  
    public String dataSourceType() {  
        return "clickhouse";  
    }  

这一块内容改为其他数据库名称,至于有一些数据库不使用database,比如oracle、达梦,使用instance、serviceName等属性,需要将database转为serviceName

/**  
 * ServiceName 
 **/
private static final JobParamDefine<String> SOURCE_SERVICE_NAME = JobParams.define("connection[0].jdbcUrl[0].serviceName", JobParamConstraints.SERVICE_NAME);  
private static final JobParamDefine<String> SINK_SERVICE_NAME = JobParams.define("connection[0].jdbcUrl.serviceName", JobParamConstraints.SERVICE_NAME);

在datax中添加构建jdbcUrl语句的功能

exchangis中,构建完datax的json格式后jdbcUrl的格式为LinkedMap格式,并不符合datax需要的string格式
image-40.png|exchangis构建的jdbcUrl格式
image-41.png|datax需要的jdbcUrl格式
因此在datax中,需要将上面的linkedMap拼装为下面的string语句
exchangis-engines/engines/datax/datax-core/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/OriginalConfPretreatmentUtil.java中修改dealJdbcAndTable方法,原有的方法是对每一个数据库都有一个拼装语句,我们需要添加自己需要的Clickhouse的JdbcUrl语句
image-42.png|原有的jdbcUrl拼装语句
添加一个clickhouse的jdbcUrl拼装

else if(DATABASE_TYPE.equals(DataBaseType.ClickHouse)) {  
    List<Object> jdbcUrlObjects = connConf.getList(Key.JDBC_URL);  
    for (Object obj : jdbcUrlObjects) {  
        Map<String, Object> map = (Map<String, Object>) obj;  
        String parameter = "";  
        Map<String, Object> parameterMap = originalConfig.getMap(Key.CONNPARM, new HashMap<>());  
        for(String key : map.keySet()){  
            if (key.equals(Key.CONNPARM)){  
                parameterMap.putAll((Map<String, Object>) map.get(key));  
            }  
        }  
        parameter = parameterMap.entrySet().stream().map(  
                e->String.join("=", e.getKey(), String.valueOf(e.getValue()))  
        ).collect(Collectors.joining("&"));  
  
        String jcUrl = Key.JDBCCLICKHOUSE + map.get(Key.HOST).toString() + ":" + map.get(Key.PORT).toString() + "/" + map.get(Key.DATABASE).toString();  
        if (!parameter.isEmpty()) {  
            jcUrl = Key.JDBCCLICKHOUSE + map.get(Key.HOST).toString() + ":" + map.get(Key.PORT).toString() + "/" + map.get(Key.DATABASE).toString() + "?" + parameter;  
        }  
        jdbcUrls.add(jcUrl);  
    }

同样的,需要在writer中也添加clickhouse的jdbcUrl拼装语句
exchangis-engines/engines/datax/datax-core/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java

else if(DATABASE_TYPE.equals(DataBaseType.ClickHouse)) {  
    Map<String, Object> map = connConf.getMap(com.alibaba.datax.plugin.rdbms.reader.Key.JDBC_URL);  
    String parameter = "";  
    Map<String, Object> parameterMap = originalConfig.getMap(com.alibaba.datax.plugin.rdbms.reader.Key.CONNPARM, new HashMap<>());  
    for (String key : map.keySet()) {  
        if (key.equals(com.alibaba.datax.plugin.rdbms.reader.Key.CONNPARM)) {  
            parameterMap.putAll((Map<String, Object>) map.get(key));  
        }  
    }  
    parameter = parameterMap.entrySet().stream().map(  
            e -> String.join("=", e.getKey(), String.valueOf(e.getValue()))  
    ).collect(Collectors.joining(";"));  
    jdbcUrl = com.alibaba.datax.plugin.rdbms.reader.Key.JDBCCLICKHOUSE + map.get(com.alibaba.datax.plugin.rdbms.reader.Key.HOST).toString() + ":" + map.get(com.alibaba.datax.plugin.rdbms.reader.Key.PORT).toString() + "/" + map.get(com.alibaba.datax.plugin.rdbms.reader.Key.DATABASE).toString();  
    if (!parameter.isEmpty()) {  
        jdbcUrl = com.alibaba.datax.plugin.rdbms.reader.Key.JDBCCLICKHOUSE + map.get(com.alibaba.datax.plugin.rdbms.reader.Key.HOST).toString() + ":" + map.get(com.alibaba.datax.plugin.rdbms.reader.Key.PORT).toString() + "/" + map.get(com.alibaba.datax.plugin.rdbms.reader.Key.DATABASE).toString() + "?" + parameter;  
    }

同时,有些变量需要创建,在exchangis-engines/engines/datax/datax-core/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java创建一个JDBCCLICKHOUSE的jdbc前缀

public final static String JDBCCLICKHOUSE = "jdbc:clickhouse://";

修改exchangis-engines/engines/datax/datax-core/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java的Clickhouse的driverClassName
image-43.png

ClickHouse("clickhouse", "com.clickhouse.jdbc.ClickHouseDriver");