package com.product.data.service;
|
|
import cn.hutool.core.map.CaseInsensitiveMap;
|
import cn.hutool.core.util.IdUtil;
|
import cn.hutool.core.util.NumberUtil;
|
import com.google.common.collect.Sets;
|
import com.product.common.lang.StringUtils;
|
import com.product.core.dao.BaseDao;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.service.support.AbstractBaseService;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.core.transfer.Transactional;
|
import com.product.data.config.CmnConst;
|
import com.product.data.config.DatabaseType;
|
import com.product.data.config.ErrorCode;
|
import com.product.data.connection.ConnectionManager;
|
import com.product.data.entity.DatabaseEntity;
|
import com.product.data.entity.QueryResultEntity;
|
import com.product.data.service.impl.IRemoteService;
|
import com.product.data.service.impl.ISyncDataConfigService;
|
import com.product.data.utli.QueryDataService;
|
import com.product.quartz.service.impl.SysJobService;
|
import com.product.tool.table.service.DataModelService;
|
import com.product.util.BaseUtil;
|
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
|
import org.quartz.SchedulerException;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
import redis.clients.jedis.Jedis;
|
import redis.clients.jedis.JedisSentinelPool;
|
|
import java.sql.Connection;
|
import java.util.*;
|
|
/**
|
* @Author cheng
|
* @Date 2022/2/8 15:57
|
* @Desc 同步数据配置Service
|
*/
|
@Service
|
public class SyncDataConfigService extends AbstractBaseService implements ISyncDataConfigService {
|
|
@Autowired
|
SysJobService sysJobService;
|
|
@Autowired
|
DataModelService dataModelService;
|
|
@Autowired(required = false)
|
IRemoteService remoteService;
|
|
|
public FieldSetEntity findSyncTaskDetail(String uuid) {
|
FieldSetEntity fse = getBaseDao().getFieldSetEntity(CmnConst.TABLE_SYNC_MANAGER_SUB, uuid, false);
|
StringBuilder sql = new StringBuilder();
|
sql.append(" SELECT ");
|
sql.append(" uuid, source_field,a.field_type field_tag,target_field,master_uuid,b.* ");
|
sql.append(" FROM ");
|
sql.append(" product_sys_data_sync_manager_field a ");
|
sql.append(" JOIN ( ");
|
sql.append(" SELECT ");
|
sql.append(" `field`.uuid field_uuid, ");
|
sql.append(" table_name, ");
|
sql.append(" `table`.uuid table_uuid, ");
|
sql.append(" field_name, ");
|
sql.append(" field_description, ");
|
sql.append(" field_show_name, ");
|
sql.append(" field_type, ");
|
sql.append(" field_unit, ");
|
sql.append(" field_length, ");
|
sql.append(" is_filter, ");
|
sql.append(" field_reference ");
|
sql.append(" FROM ");
|
sql.append(" product_sys_datamodel_table `table` ");
|
sql.append(" JOIN product_sys_datamodel_field `field` ON `table`.uuid = `field`.table_uuid ");
|
sql.append(" ) b ON a.master_uuid=? and b.table_name=? and a.target_field = b.field_name ");
|
DataTableEntity dt = getBaseDao().listTable(sql.toString(), new Object[]{fse.getUUID(), fse.getString(CmnConst.TARGET_TABLE)});
|
dt.getMeta().setTableName(new Object[]{CmnConst.TABLE_SYNC_MANAGER_FIELD});
|
fse.addSubDataTable(dt);
|
fse.addSubDataTable(getBaseDao().listTable(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE_INDEX, "table_uuid=?", new Object[]{dt.getString(0, CmnConst.TABLE_UUID)}));
|
return fse;
|
}
|
|
|
/**
|
* 删除同步表配置
|
*
|
* @param fse
|
* @return
|
* @throws BaseException
|
*/
|
@Transactional
|
@Override
|
public void delSyncTableConfig(FieldSetEntity fse) throws BaseException, SchedulerException {
|
Object[] params = new Object[]{fse.getUUID()};
|
getBaseDao().delete(fse.getTableName(), params);
|
//查询同步配置相关的定时任务并删除
|
FieldSetEntity timeTaskFieldSet = getBaseDao().getFieldSetEntityBySQL(
|
"SELECT * FROM product_sys_timed_task WHERE uuid= (SELECT time_task_uuid from product_sys_data_sync_manager_sub where uuid =?)", params, false);
|
if (timeTaskFieldSet != null) {
|
sysJobService.deleteJob(timeTaskFieldSet);
|
}
|
}
|
|
|
/**
|
* 操作表
|
*
|
* @param fse
|
* @throws BaseException
|
*/
|
private FieldSetEntity operationTable(FieldSetEntity fse, boolean update) throws BaseException {
|
FieldSetEntity table = new FieldSetEntity();
|
table.setTableName(CmnConst.TABLE_MODULE_TABLE);
|
// 表名
|
table.setValue(CmnConst.TABLE_NAME, fse.getString(CmnConst.TARGET_TABLE));
|
// 表类型
|
table.setValue(CmnConst.TABLE_TYPE, 1);
|
table.setValue("sequence", 1);
|
table.setValue(CmnConst.TABLE_DESCRIPTION, "数据库同步表,来源表:" + fse.getString(CmnConst.SOURCE_TABLE));
|
table.addSubDataTable(fse.getSubDataTable(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE_INDEX));
|
DataTableEntity subDataTable = fse.getSubDataTable(CmnConst.TABLE_SYNC_MANAGER_FIELD);
|
|
DataTableEntity field = new DataTableEntity();
|
List<String> fieldName = new ArrayList<>();
|
boolean isPk = false;
|
for (int i = 0; i < subDataTable.getRows(); i++) {
|
if (i == 0) {
|
table.setValue(CmnConst.UUID, subDataTable.getString(i, CmnConst.TABLE_UUID));
|
}
|
String field_type = subDataTable.getString(i, CmnConst.FIELD_TYPE);
|
//将 sync_manager_field 子表中 field_tag 还原到 field_type 中
|
subDataTable.setFieldValue(i, CmnConst.FIELD_TYPE, subDataTable.getString(i, "field_tag"));
|
FieldSetEntity fs = subDataTable.getFieldSetEntity(i).clones();
|
fs.setValue(CmnConst.FIELD_TYPE, field_type);
|
fs.setTableName(CmnConst.TABLE_MODULE_FIELD);
|
// 将 field_uuid 写到uuid字段中
|
fs.setValue(CmnConst.UUID, fs.getString("field_uuid"));
|
field.addFieldSetEntity(fs);
|
fieldName.add(fs.getString(CmnConst.FIELD_NAME));
|
if ("pk".equalsIgnoreCase(fs.getString(CmnConst.FIELD_TYPE))) {
|
isPk = true;
|
}
|
}
|
if (!update) {
|
if (fieldName.contains("uuid")) {
|
throw new BaseException(ErrorCode.SYNC_FIELD_COLUMN_EXISTS_SAME_FIELD.getValue(), ErrorCode.SYNC_FIELD_COLUMN_EXISTS_SAME_FIELD.getText().replace("{{fieldName}}", "uuid"));
|
} else {
|
FieldSetEntity d = new FieldSetEntity();
|
d.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_FIELD);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_NAME, "uuid");
|
d.setValue("field_show_name", "唯一标识");
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_TYPE, "string");
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_LENGTH, 80);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_UNIT, 0);
|
d.setValue(com.product.tool.table.config.CmnConst.IS_REQUIRED, 1);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_DESCRIPTION, "唯一标识");
|
field.addFieldSetEntity(d);
|
d = new FieldSetEntity();
|
d.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE_INDEX);
|
d.setValue("index_name", "uuid_unique_" + IdUtil.simpleUUID().substring(0, 5));
|
d.setValue("index_fields", "uuid");
|
d.setValue("index_type", 1);
|
DataTableEntity indexSub = table.getSubDataTable(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE_INDEX);
|
if (indexSub == null) {
|
indexSub = new DataTableEntity();
|
}
|
indexSub.addFieldSetEntity(d);
|
table.addSubDataTable(indexSub);
|
|
}
|
if (!isPk) {
|
String pkField = "id";
|
int i = 1;
|
while (fieldName.contains(pkField)) {
|
pkField = "id" + i;
|
i++;
|
}
|
FieldSetEntity d = new FieldSetEntity();
|
d.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_FIELD);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_NAME, pkField);
|
d.setValue("field_show_name", "自增主键");
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_TYPE, "pk");
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_LENGTH, 11);
|
d.setValue(com.product.tool.table.config.CmnConst.IS_REQUIRED, 1);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_UNIT, 0);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_DESCRIPTION, "自增主键");
|
field.addFieldSetEntity(d);
|
}
|
}
|
table.addSubDataTable(field);
|
return table;
|
}
|
|
/**
|
* 保存同步表配置
|
*
|
* @param fse
|
* @return
|
* @throws BaseException
|
*/
|
@Transactional
|
@Override
|
public String saveSyncTableConfig(FieldSetEntity fse) throws BaseException {
|
fse.setValue("query_type", 0);
|
BaseUtil.createCreatorAndCreationTime(fse);
|
boolean update = true;
|
if (StringUtils.isEmpty(fse.getUUID())) {
|
fse.setValue(CmnConst.UUID, IdUtil.randomUUID());
|
update = false;
|
}
|
DataTableEntity subDataTable = fse.getSubDataTable(CmnConst.TABLE_SYNC_MANAGER_FIELD);
|
if (DataTableEntity.isEmpty(subDataTable)) {
|
throw new BaseException(ErrorCode.SAVE_SYNC_TASK_FIELD_INFO_EMPTY);
|
}
|
|
FieldSetEntity tableData = operationTable(fse, update);
|
//增量同步
|
if (1 == fse.getInteger("sync_type")) {
|
//唯一字段
|
boolean unique_field = false;
|
//增量标识字段
|
boolean incremental_field = false;
|
|
boolean update_field = false;
|
for (int i = 0; i < subDataTable.getRows(); i++) {
|
String field_type = subDataTable.getString(i, CmnConst.FIELD_TYPE);
|
if (!unique_field || !incremental_field) {
|
List<String> templateType = Arrays.asList(field_type.split(","));
|
if (templateType.contains("2")) {
|
unique_field = true;
|
} else if (templateType.contains("1") && templateType.contains("3")) {
|
//增量标识和更新标识为同一字段
|
incremental_field = true;
|
update_field = true;
|
// throw new BaseException(ErrorCode.INCREMENTAL_UPDATE_CAN_NOT_SAME_FIELD);
|
} else if (templateType.contains("1")) {
|
incremental_field = true;
|
} else if (templateType.contains("3")) {
|
update_field = true;
|
}
|
} else {
|
break;
|
}
|
}
|
if (update_field && !unique_field) {
|
throw new BaseException(ErrorCode.SYNC_DATA_UNIQUE_FIELD_NOT_EXIST);
|
}
|
if (!incremental_field) {
|
throw new BaseException(ErrorCode.SYNC_DATA_INCREMENTAL_FIELD_NOT_EXIST);
|
}
|
}
|
fse.removeSubData(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE_INDEX);
|
try {
|
//因为uuid是自己生成的(定时任务需要这个uuid)所以要判断 否则调用saveFieldSet 会执行更新
|
boolean clearTaskUuid = false;
|
if (StringUtils.isEmpty(fse.getString("time_task_uuid"))) {
|
// 先写入临时的(占位)定时任务表uuid 避免保存报错定时任务加入队列后无法删除(connection 被关闭)
|
fse.setValue("time_task_uuid", "tempUuid");
|
clearTaskUuid = true;
|
}
|
if (update) {
|
getBaseDao().saveFieldSetEntity(fse);
|
} else {
|
getBaseDao().add(fse);
|
}
|
if (clearTaskUuid) {
|
fse.setValue("time_task_uuid", null);
|
}
|
|
if (update && !StringUtils.isEmpty(tableData.getUUID())) {
|
FieldSetEntity uuidField = BaseUtil.getSingleInfoByCache("表字段信息", new String[]{tableData.getUUID(), "uuid"});
|
tableData.getSubData().get("product_sys_datamodel_field").addFieldSetEntity(uuidField);
|
FieldSetEntity f = getBaseDao().getFieldSetEntityByFilter("product_sys_datamodel_table_index", "table_uuid =? and index_type=1 and index_fields ='uuid' ", new Object[]{tableData.getUUID()}, false);
|
if (f != null && !StringUtils.isEmpty(f.getUUID())) {
|
DataTableEntity indexSub = tableData.getSubDataTable(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE_INDEX);
|
if (indexSub == null) {
|
indexSub = new DataTableEntity();
|
}
|
List<FieldSetEntity> fieldSetEntity = indexSub.getFieldSetEntity(f.getUUID());
|
if (fieldSetEntity == null || fieldSetEntity.size() <= 0) {
|
indexSub.addFieldSetEntity(f);
|
tableData.addSubDataTable(indexSub);
|
}
|
}
|
}
|
dataModelService.dataModelOperation(tableData);
|
//创建同步定时任务
|
FieldSetEntity timeTaskFieldSet = createdTimeTask(fse.getString("target_table"), fse.getString("execute_time"), fse.getUUID(), fse.getString("time_task_uuid"));
|
fse.getSubData().clear();
|
//回写定时任务uuid
|
fse.setValue("time_task_uuid", timeTaskFieldSet.getUUID());
|
getBaseDao().saveFieldSetEntity(fse);
|
} catch (Exception e) {
|
e.printStackTrace();
|
throw e;
|
}
|
return fse.getUUID();
|
}
|
|
|
/**
|
* 保存同步连接配置
|
*
|
* @param fse
|
* @return
|
* @throws BaseException
|
*/
|
@Transactional
|
@Override
|
public String saveSyncConnectionConfig(FieldSetEntity fse) throws BaseException {
|
BaseUtil.createCreatorAndCreationTime(fse);
|
getBaseDao().saveFieldSetEntity(fse);
|
if (fse.getInteger("db_type") != null && fse.getInteger("db_type") == 5) {
|
// redis 仅仅连接一哈
|
try {
|
// System.out.println(fse.getString("ip"));
|
// System.out.println(fse.getString("port"));
|
// System.out.println(fse.getString("pass_word"));
|
// System.out.println(fse.getString("instance"));
|
Jedis jedis = new Jedis(fse.getString("ip"), fse.getInteger("port"));
|
if (!StringUtils.isEmpty(fse.getString("pass_word"))) {
|
jedis.auth(fse.getString("pass_word"));
|
}
|
int dbIndex = StringUtils.isEmpty(fse.getString("instance")) ? 0 : fse.getInteger("instance");
|
jedis.select(dbIndex);
|
jedis.connect();
|
jedis.close();
|
} catch (Exception e) {
|
throw new BaseException(ErrorCode.GET_CONNECTION_FAIL);
|
}
|
} else if (fse.getInteger("db_type") != null && fse.getInteger("db_type") == 6) {
|
// Set<String> sentinels = new HashSet<>();
|
// System.out.println(fse.getString("ip"));
|
// System.out.println(fse.getString("port"));
|
// System.out.println(fse.getString("pass_word"));
|
// System.out.println(fse.getString("instance"));
|
String[] ip = fse.getString("ip").split(",");
|
|
// sentinels.add("127.0.0.1:26390");
|
// sentinels.add("127.0.0.1:26391");
|
// sentinels.add("127.0.0.1:26392");
|
GenericObjectPoolConfig<Jedis> poolConfig = new GenericObjectPoolConfig<>();
|
poolConfig.setMaxIdle(100);
|
poolConfig.setMaxWaitMillis(10000);
|
poolConfig.setTestOnBorrow(true);
|
int connectionTimeout = 5000;
|
int soTimeout = 5000;
|
String password = null;
|
int database = 0;
|
try (JedisSentinelPool jsPool = new JedisSentinelPool(fse.getString("user_name"), Sets.newHashSet(ip), poolConfig,
|
connectionTimeout, soTimeout, password, database); Jedis jedis = jsPool.getResource()) {
|
jedis.connect();
|
} catch (Exception e) {
|
throw new BaseException(ErrorCode.GET_CONNECTION_FAIL);
|
}
|
} else {
|
DatabaseEntity databaseEntity = new DatabaseEntity(fse);
|
this.addTask(databaseEntity, fse.getUUID());
|
}
|
if (remoteService != null) {
|
remoteService.saveSyncConnectionConfig(fse);
|
}
|
return fse.getUUID();
|
}
|
|
public synchronized void addTableField(DatabaseEntity dbe, String connectionConfigUuid, String tableName) throws BaseException {
|
try (Connection connection = ConnectionManager.getConnection(dbe)) {
|
DataTableEntity dt = new DataTableEntity();
|
QueryDataService queryDataService = new QueryDataService(connection);
|
getFieldName(queryDataService, tableName, dbe, connectionConfigUuid, dt, getFieldTypeReference(dbe.getDbType()), null);
|
if (!DataTableEntity.isEmpty(dt)) {
|
getBaseDao().add(dt);
|
}
|
} catch (Exception e) {
|
throw new BaseException(e);
|
}
|
}
|
|
/**
|
* 获取来源表字段
|
*
|
* @param dbe
|
* @param uuid
|
*/
|
private synchronized void addTask(DatabaseEntity dbe, String uuid) {
|
BaseDao baseDao = getBaseDao();
|
try (Connection connection = ConnectionManager.getConnection(dbe)) {
|
if (connection != null) {
|
QueryDataService queryDataService = new QueryDataService(connection);
|
String[] tableName = getTableName(queryDataService, dbe);
|
if ((tableName != null && tableName.length > 0) || DatabaseType.PSQL.equals(dbe.getDbType()) || DatabaseType.Oracle.equals(dbe.getDbType())) {
|
Map<String, String> fieldTypeReference = getFieldTypeReference(dbe.getDbType());
|
DataTableEntity dt = new DataTableEntity();
|
if (DatabaseType.Oracle.equals(dbe.getDbType())) {
|
Map<String, List<Map<String, Object>>> tableFieldMap = getOracleFieldInfo(queryDataService);
|
tableFieldMap.forEach((k, v) -> {
|
getFieldName(queryDataService, k, dbe, uuid, dt, fieldTypeReference, v);
|
});
|
} else if (DatabaseType.PSQL.equals(dbe.getDbType())) {
|
Map<String, List<Map<String, Object>>> pSqlFieldInfo = getPSqlFieldInfo(dbe.getDbInstance(), queryDataService);
|
pSqlFieldInfo.forEach((k, v) -> {
|
getFieldName(queryDataService, k, dbe, uuid, dt, fieldTypeReference, v);
|
});
|
} else {
|
for (String table : tableName) {
|
getFieldName(queryDataService, table, dbe, uuid, dt, fieldTypeReference, null);
|
}
|
}
|
|
if (!DataTableEntity.isEmpty(dt)) {
|
baseDao.delete(CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD, "sync_manager_uuid=?", new Object[]{uuid});
|
baseDao.add(dt);
|
}
|
}
|
|
}
|
} catch (BaseException e) {
|
throw e;
|
} catch (Exception e) {
|
throw new BaseException(e);
|
}
|
}
|
|
/**
|
* 创建定时任务
|
*
|
* @param target_table 目标表
|
* @param execute_time corn 表达式
|
* @param task_uuid 同步数据配置 uuid product_sys_data_sync_manager_sub
|
* @param timeTaskUuid 定时任务uuid
|
*/
|
private FieldSetEntity createdTimeTask(String target_table, String execute_time, String task_uuid, String timeTaskUuid) throws BaseException {
|
//
|
FieldSetEntity fse = new FieldSetEntity();
|
try {
|
fse.setTableName("product_sys_timed_task");
|
fse.setValue("uuid", timeTaskUuid);
|
fse.setValue("job_name", target_table + "表数据同步");//任务名称
|
fse.setValue("job_group", "system");//分组
|
fse.setValue("invoke_target", "syncDataManager.executeDataSyncTask('" + task_uuid + "')");//调用目标字符串
|
fse.setValue("cron_expression", execute_time);//cron表达式
|
fse.setValue("misfire_policy", "3");//错误执行策略 只执行一次
|
fse.setValue("concurrent", 0);//不允许并发执行
|
fse.setValue("remark", target_table + "表数据同步");
|
fse.setValue("created_by", SpringMVCContextHolder.getCurrentUser().getUser_id());
|
fse.setValue("created_utc_datetime", new Date());
|
fse.setValue("status", 1);
|
fse.setValue("is_conceal", 1);
|
if (!StringUtils.isEmpty(timeTaskUuid)) {
|
sysJobService.updateJob(fse);
|
} else {
|
sysJobService.insertJob(fse);
|
}
|
return fse;
|
} catch (Exception e) {
|
e.printStackTrace();
|
throw new BaseException(ErrorCode.CRATED_TIMED_TASK_FAIL);
|
}
|
//
|
}
|
|
/**
|
* 保存字段
|
*
|
* @param uuid
|
* @param fieldMap
|
* @return
|
*/
|
@Deprecated
|
private DataTableEntity saveTableField(String uuid, Map<String, String[]> fieldMap) {
|
DataTableEntity dt = new DataTableEntity();
|
fieldMap.forEach((k, v) -> {
|
for (String fieldName : v) {
|
FieldSetEntity field = new FieldSetEntity();
|
field.setTableName(CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD);
|
field.setValue("sync_manager_uuid", uuid);
|
//表名
|
field.setValue("table_name", k);
|
//字段名
|
field.setValue("field_name", fieldName);
|
dt.addFieldSetEntity(field);
|
}
|
|
});
|
return dt;
|
|
}
|
|
public Map<String, String> getFieldTypeReference(DatabaseType dbt) {
|
Map<String, String> fieldType = new CaseInsensitiveMap<>();
|
if (dbt == null) {
|
return fieldType;
|
}
|
DataTableEntity dt = getBaseDao().listTable("product_sys_field_type_reference", "db_type=?", new Object[]{dbt.getValue()});
|
if (DataTableEntity.isEmpty(dt)) {
|
return fieldType;
|
}
|
for (int i = 0; i < dt.getRows(); i++) {
|
fieldType.put(dt.getString(i, "db_field_type").trim(), dt.getString(i, "sys_field_type").trim());
|
}
|
return fieldType;
|
}
|
|
/**
|
* oracle 字段元信息
|
*
|
* @param queryDataService
|
* @return
|
*/
|
public Map<String, List<Map<String, Object>>> getOracleFieldInfo(QueryDataService queryDataService) {
|
StringBuilder sql = new StringBuilder();
|
sql.append(" SELECT ");
|
sql.append(" T .column_name AS column_name, ");//--列名
|
sql.append(" T .column_type AS column_type, ");//字段类型
|
sql.append(" T .data_length AS column_length, ");//字段长度
|
sql.append(" T .data_scale AS column_scale, ");//字段精度
|
sql.append(" T .column_comment AS column_comment, ");//字段注释
|
sql.append(" T .table_name AS table_name ");//字段注释
|
sql.append(" FROM ");
|
sql.append(" ( ");
|
sql.append(" SELECT ");
|
sql.append(" UB.tablespace_name AS database_name, ");
|
sql.append(" UTC.table_name AS table_name, ");
|
sql.append(" UTC.column_name AS column_name, ");
|
sql.append(" UTC.data_length AS data_length, ");
|
sql.append(" UTC.data_type AS column_type, ");
|
sql.append(" utc.data_scale AS data_scale, ");
|
sql.append(" ucc.comments AS column_comment, ");
|
sql.append(" utc.column_id, ");
|
sql.append(" utc.nullable ");
|
sql.append(" FROM ");
|
sql.append(" user_tables ub ");
|
sql.append(" LEFT JOIN user_tab_columns utc ON ub.table_name = UTC.table_name ");
|
sql.append(" LEFT JOIN user_col_comments ucc ON utc.column_name = ucc.column_name ");
|
sql.append(" AND utc.table_name = ucc.table_name ");
|
sql.append(" ) T ");
|
sql.append(" LEFT JOIN ( ");
|
sql.append(" SELECT ");
|
sql.append(" UCC.table_name AS table_name, ");
|
sql.append(" ucc.column_name AS column_name, ");
|
sql.append(" wm_concat (UC.constraint_type) AS constraint_type ");
|
sql.append(" FROM ");
|
sql.append(" user_cons_columns ucc ");
|
sql.append(" LEFT JOIN user_constraints uc ON UCC.constraint_name = UC.constraint_name ");
|
sql.append(" GROUP BY ");
|
sql.append(" UCC.table_name, ");
|
sql.append(" ucc.column_name ");
|
sql.append(" ) b ON T .table_name = b.TABLE_NAME ");
|
sql.append(" AND T .column_name = b.column_name ");
|
// sql.append(" where T.table_name='" + tableName + "' ");
|
QueryResultEntity queryResult = queryDataService.getResult(sql.toString());
|
List<Map<String, Object>> resultList = queryResult.getResult();
|
Map<String, List<Map<String, Object>>> result = new HashMap<>();
|
for (Map<String, Object> map : resultList) {
|
String tableName = map.get("table_name") == null ? (String) map.get("table_name".toUpperCase()) : (String) map.get("table_name");
|
if (tableName == null || tableName.length() == 0) {
|
continue;
|
}
|
List<Map<String, Object>> table_name = result.get(tableName);
|
if (table_name == null) {
|
table_name = new ArrayList<>();
|
result.put(tableName, table_name);
|
}
|
table_name.add(map);
|
}
|
return result;
|
}
|
|
/**
|
* Mysql 数据库 字段名称 类型 长度 注释
|
*
|
* @param tableName
|
* @param queryDataService
|
* @return
|
*/
|
public List<Map<String, Object>> getMysqlFieldInfo(String tableName, QueryDataService queryDataService) {
|
StringBuilder sql = new StringBuilder();
|
sql.append(" SELECT ");
|
sql.append(" COLUMN_NAME column_name, ");
|
sql.append(" data_type column_type, ");
|
sql.append(" column_type length_info, ");
|
sql.append(" column_comment ");
|
sql.append(" FROM ");
|
sql.append(" INFORMATION_SCHEMA.COLUMNS ");
|
sql.append(" WHERE ");
|
sql.append(" TABLE_NAME = '" + tableName + "' ");
|
sql.append(" AND TABLE_SCHEMA = (select database()) ");
|
QueryResultEntity queryResult = queryDataService.getResult(sql.toString());
|
List<Map<String, Object>> resultList = queryResult.getResult();
|
for (int i = 0; i < resultList.size(); i++) {
|
Map<String, Object> map = resultList.get(i);
|
String length_info = (String) map.get("length_info");
|
Object[] length = {0, 0};
|
if (length_info.indexOf("(") != -1) {
|
length_info = length_info.substring(length_info.indexOf("(") + 1, length_info.indexOf(")"));
|
if (length_info.indexOf(",") != -1) {
|
//有小数
|
length = length_info.split(",");
|
} else {
|
length[0] = length_info;
|
}
|
}
|
map.put("column_length", length[0]);
|
map.put("column_scale", length[1]);
|
}
|
return resultList;
|
}
|
|
/**
|
* sqlserver 数据库 字段名称 类型 长度 注释
|
*
|
* @param tableName
|
* @param queryDataService
|
* @return
|
*/
|
public List<Map<String, Object>> getSqlServerFieldInfo(String tableName, QueryDataService queryDataService) {
|
StringBuilder sql = new StringBuilder();
|
sql.append(" SELECT c.name as column_name,t.name as column_type,isnull(c.prec ,0)column_length,isnull(c.scale,0) column_scale ");
|
sql.append(" FROM sysobjects o ");
|
sql.append(" JOIN syscolumns c on o.id=c.id ");
|
sql.append(" JOIN systypes t on c.xusertype=t.xusertype where o.type ='u' and o.name='" + tableName + "' ");
|
|
QueryResultEntity queryResult = queryDataService.getResult(sql.toString());
|
List<Map<String, Object>> resultList = queryResult.getResult();
|
return resultList;
|
}
|
|
|
/**
|
* 查询INFORMIX数据库 字段名称 类型 长度 注释
|
*
|
* @param tableName
|
* @param queryDataService
|
* @return
|
*/
|
public List<Map<String, Object>> getInformixFieldInfo(String dbName, String tableName, QueryDataService queryDataService) {
|
StringBuilder sql = new StringBuilder();
|
sql.append(" select c.colname column_name, ");
|
sql.append(" case c.coltype ");
|
sql.append(" when '0' then 'CHAR' ");
|
sql.append(" when '1' then 'SMALLINT' ");
|
sql.append(" when '2' then 'INTEGER' ");
|
sql.append(" when '3' then 'FLOAT' ");
|
sql.append(" when '4' then 'SMALLFLOAT' ");
|
sql.append(" when '5' then 'DECIMAL' ");
|
sql.append(" when '6' then 'SERIAL' ");
|
sql.append(" when '7' then 'DATE' ");
|
sql.append(" when '8' then 'MONEY' ");
|
sql.append(" when '9' then 'NULL' ");
|
sql.append(" when '10' then 'DATETIME' ");
|
sql.append(" when '11' then 'BYTE' ");
|
sql.append(" when '12' then 'TEXT' ");
|
sql.append(" when '13' then 'VARCHAR' ");
|
sql.append(" when '14' then 'INTERVAL' ");
|
sql.append(" when '15' then 'NCHAR' ");
|
sql.append(" when '16' then 'NVARCHAR' ");
|
sql.append(" when '17' then 'INT8' ");
|
sql.append(" when '18' then 'SERIAL8' ");
|
sql.append(" when '19' then 'SET' ");
|
sql.append(" when '20' then 'MULTISET' ");
|
sql.append(" when '21' then 'LIST' ");
|
sql.append(" when '22' then 'Unnamed ROW' ");
|
sql.append(" when '40' then 'LVARCHAR' ");
|
sql.append(" when '41' then 'CLOB' ");
|
sql.append(" when '43' then 'BLOB' ");
|
sql.append(" when '44' then 'BOOLEAN' ");
|
sql.append(" when '256' then 'CHAR' ");
|
sql.append(" when '257' then 'SMALLINT' ");
|
sql.append(" when '258' then 'INTEGER' ");
|
sql.append(" when '259' then 'FLOAT' ");
|
sql.append(" when '260' then 'REAL' ");
|
sql.append(" when '261' then 'DECIMAL' ");
|
sql.append(" when '262' then 'SERIAL' ");
|
sql.append(" when '263' then 'DATE' ");
|
sql.append(" when '264' then 'MONEY' ");
|
sql.append(" when '266' then 'DATETIME' ");
|
sql.append(" when '267' then 'BYTE' ");
|
sql.append(" when '268' then 'TEXT' ");
|
sql.append(" when '269' then 'VARCHAR' ");
|
sql.append(" when '270' then 'INTERVAL' ");
|
sql.append(" when '271' then 'NCHAR' ");
|
sql.append(" when '272' then 'NVARCHAR' ");
|
sql.append(" when '273' then 'INT8' ");
|
sql.append(" when '274' then 'SERIAL8' ");
|
sql.append(" when '275' then 'SET' ");
|
sql.append(" when '276' then 'MULTISET' ");
|
sql.append(" when '277' then 'LIST' ");
|
sql.append(" when '278' then 'Unnamed ROW' ");
|
sql.append(" when '296' then 'LVARCHAR' ");
|
sql.append(" when '297' then 'CLOB' ");
|
sql.append(" when '298' then 'BLOB' ");
|
sql.append(" when '299' then 'BOOLEAN' ");
|
sql.append(" when '4118' then 'Named ROW' ");
|
sql.append(" end column_type,c.collength column_length ");
|
sql.append(" from ( ");
|
sql.append(" select * from systables ");
|
sql.append(" where tabtype='T' and owner =? and tabname=? ");
|
sql.append(" ) t ");
|
sql.append(" left join syscolumns c on t.tabid=c.tabid ");
|
QueryResultEntity queryResult = queryDataService.getResult(sql.toString(), new Object[]{dbName, tableName}, null);
|
List<Map<String, Object>> result = queryResult.getResult();
|
for (int i = 0; i < result.size(); i++) {
|
Map<String, Object> fieldInfo = new CaseInsensitiveMap<>(result.get(i));
|
result.set(i, fieldInfo);
|
String columnType = (String) fieldInfo.get("column_type");
|
if (columnType != null) {
|
columnType = columnType.trim();
|
fieldInfo.put("column_type", columnType);
|
}
|
//
|
if ("prppmain".equalsIgnoreCase(tableName) && "paytimes".equalsIgnoreCase((String) fieldInfo.get("column_name"))) {
|
System.out.println(1);
|
}
|
//日期处理
|
if ("DATETIME".equalsIgnoreCase(columnType) || "DATE".equalsIgnoreCase(columnType)) {
|
fieldInfo.put("column_length", 0);
|
} else if ("DECIMAL".equalsIgnoreCase(columnType) || "FLOAT".equalsIgnoreCase(columnType) || "SMALLFLOAT".equalsIgnoreCase(columnType) || "MONEY".equalsIgnoreCase(columnType)) {
|
if (fieldInfo.get("column_length") != null) {
|
fieldInfo.put("column_length", fieldInfo.get("column_length").toString());
|
}
|
String columnLength = (String) fieldInfo.get("column_length");
|
if (!NumberUtil.isNumber(columnLength)) {
|
fieldInfo.put("column_length", 11);
|
fieldInfo.put("column_scale", 2);
|
continue;
|
} else {
|
int colLength = NumberUtil.parseInt(columnLength);
|
int column_length = colLength >> 8;
|
int column_scale = colLength & 255;
|
if (column_scale > 30) {
|
column_scale = 30;
|
}
|
if (column_length < column_scale) {
|
column_length = column_scale;
|
}
|
fieldInfo.put("column_length", column_length);
|
fieldInfo.put("column_scale", column_scale);
|
}
|
|
} else {
|
Object columnLength = fieldInfo.get("column_length");
|
if (columnLength != null && NumberUtil.parseInt(columnLength.toString()) > 8000) {
|
fieldInfo.put("column_length", 8000);
|
}
|
fieldInfo.put("column_scale", 0);
|
}
|
}
|
return result;
|
}
|
|
/**
|
* 查询PSQL数据库 字段名称 类型 长度 注释
|
*
|
* @param queryDataService
|
* @return
|
*/
|
public Map<String, List<Map<String, Object>>> getPSqlFieldInfo(String instance, QueryDataService queryDataService) {
|
StringBuilder sql = new StringBuilder();
|
sql.append(" SELECT case when column_length<0 then 0 else column_length end as column_length,table_name,\"column_name\",column_type,column_comment FROM ( ");
|
sql.append(" SELECT C ");
|
sql.append(" .relname AS \"table_name\", ");
|
sql.append(" A.attname AS \"column_name\", ");
|
sql.append(" ( CASE WHEN A.attlen > 0 THEN A.attlen ELSE A.atttypmod - 4 END ) AS \"column_length\", ");
|
sql.append(" replace(format_type ( A.atttypid, A.atttypmod ),substring(format_type ( A.atttypid, A.atttypmod ),\"position\"(format_type ( A.atttypid, A.atttypmod ),'('),\"position\"" +
|
"(format_type ( A.atttypid, A.atttypmod ),')')),'') AS \"column_type\", ");
|
sql.append(" d.description AS \"column_comment\" ");
|
sql.append(" FROM ");
|
sql.append(" pg_attribute ");
|
sql.append(" A LEFT JOIN pg_description d ON d.objoid = A.attrelid ");
|
sql.append(" AND d.objsubid = A.attnum ");
|
sql.append(" LEFT JOIN pg_class C ON A.attrelid = C.oid ");
|
sql.append(" LEFT JOIN pg_type T ON A.atttypid = T.oid ");
|
sql.append(" WHERE ");
|
sql.append(" A.attnum >= 0 ");
|
sql.append(" AND C.relname IN ( SELECT \"tablename\" FROM pg_tables WHERE schemaname = ? ) ");
|
sql.append(" ) A ");
|
|
QueryResultEntity queryResult = queryDataService.getResult(sql.toString(), new Object[]{instance}, null);
|
List<Map<String, Object>> resultList = queryResult.getResult();
|
Map<String, List<Map<String, Object>>> result = new HashMap<>();
|
for (int i = 0; i < resultList.size(); i++) {
|
Map<String, Object> fieldInfo = new CaseInsensitiveMap<>(resultList.get(i));
|
resultList.set(i, fieldInfo);
|
String column_type = (String) fieldInfo.get("column_type");
|
if (fieldInfo.get("column_length") == null) {
|
fieldInfo.put("column_length", 20);
|
}
|
String tableName = (String) fieldInfo.get("table_name");
|
if (StringUtils.isEmpty(tableName)) {
|
continue;
|
}
|
//小数需要转换
|
if (column_type.toLowerCase().indexOf("numeric") != -1 && NumberUtil.isNumber(fieldInfo.get("column_length").toString())) {
|
int columnLength = NumberUtil.parseInt(fieldInfo.get("column_length").toString());
|
int column_length = columnLength >> 16;
|
int column_scale = columnLength & 255;
|
if (column_scale > 30) {
|
column_scale = 30;
|
}
|
if (column_length < column_scale) {
|
column_length = column_scale;
|
}
|
fieldInfo.put("column_length", column_length);
|
fieldInfo.put("column_scale", column_scale);
|
} else if ("date".equalsIgnoreCase(column_type)) {
|
//日期
|
fieldInfo.put("column_length", 0);
|
fieldInfo.put("column_scale", 0);
|
} else if ("real".equalsIgnoreCase(column_type)) {
|
//可变精度 将小数 和长度都设为一致 PSQL 数据库 float 最大支持 15位 MYSQL 30位
|
fieldInfo.put("column_length", fieldInfo.get("column_length"));
|
fieldInfo.put("column_scale", fieldInfo.get("column_length"));
|
} else if ("text".equalsIgnoreCase(column_type)) {
|
fieldInfo.put("column_length", 8000);
|
} else {
|
fieldInfo.put("column_scale", 0);
|
}
|
List<Map<String, Object>> table_name = result.get(tableName);
|
if (table_name == null) {
|
table_name = new ArrayList<>();
|
result.put(tableName, table_name);
|
}
|
table_name.add(fieldInfo);
|
}
|
return result;
|
}
|
|
public void getFieldName(String saveTableName, QueryDataService queryDataService, String tableName, DatabaseEntity dbe, String uuid, DataTableEntity dt, Map<String, String> fieldTypeReference, List<Map<String, Object>> resultList) {
|
DatabaseType dbt = dbe.getDbType();
|
if (DatabaseType.MySql.equals(dbt)) {
|
resultList = getMysqlFieldInfo(tableName, queryDataService);
|
} else if (DatabaseType.Oracle.equals(dbt)) {
|
// resultList = getOracleFieldInfoSql(tableName, queryDataService);
|
} else if (DatabaseType.SqlServer.equals(dbt)) {
|
resultList = getSqlServerFieldInfo(tableName, queryDataService);
|
} else if (DatabaseType.Informix.equals(dbt)) {
|
// QueryResultEntity queryResult = queryDataService.getResult(" SELECT colname COLUMN_NAME from \"informix\".syscolumns where tabid = ( " +
|
// String.format(" SELECT tabid FROM \"informix\".systables WHERE tabname = '%s') ", tableName));
|
resultList = getInformixFieldInfo(dbe.getDbName(), tableName, queryDataService);
|
} else if (DatabaseType.PSQL.equals(dbt)) {
|
|
} else {
|
return;
|
}
|
if (resultList != null) {
|
for (int i = 0; i < resultList.size(); i++) {
|
// 字段名
|
Object column_mame = resultList.get(i).get("column_name");
|
if (column_mame == null) {
|
column_mame = resultList.get(i).get("column_name".toUpperCase());
|
}
|
// 注释
|
Object column_comment = resultList.get(i).get("column_comment");
|
if (column_comment == null) {
|
column_comment = resultList.get(i).get("column_comment".toUpperCase());
|
}
|
// 类型
|
Object column_type = resultList.get(i).get("column_type");
|
if (column_type == null) {
|
column_type = resultList.get(i).get("column_type".toUpperCase());
|
}
|
//长度
|
Object column_length = resultList.get(i).get("column_length");
|
if (column_length == null) {
|
column_length = resultList.get(i).get("column_length".toUpperCase());
|
}
|
// 精度
|
Object column_scale = resultList.get(i).get("column_scale");
|
if (column_scale == null) {
|
column_scale = resultList.get(i).get("column_scale".toUpperCase());
|
}
|
FieldSetEntity field = new FieldSetEntity();
|
field.setTableName(saveTableName);
|
if (CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD.equals(saveTableName)) {
|
field.setValue("sync_manager_uuid", uuid);
|
}
|
//表名
|
field.setValue("table_name", tableName.toLowerCase());
|
//字段名
|
field.setValue("field_name", column_mame.toString().toLowerCase());
|
//字段描述
|
field.setValue("field_desc", column_comment);
|
//字段长度
|
field.setValue("field_length", "text".equalsIgnoreCase(column_type != null ? column_type.toString() : "") ? 0 : column_length);
|
//字段精度
|
field.setValue("field_unit", column_scale);
|
//字段类型
|
String field_type = fieldTypeReference == null ? null : fieldTypeReference.get(column_type);
|
if (field_type == null) {
|
System.out.println(field_type);
|
}
|
field.setValue(CmnConst.FIELD_TYPE, field_type == null ? "string" : field_type);
|
|
dt.addFieldSetEntity(field);
|
}
|
}
|
|
}
|
|
/**
|
* 获取字段名称
|
*
|
* @param queryDataService
|
* @param tableName
|
* @param dbe
|
* @return
|
*/
|
public void getFieldName(QueryDataService queryDataService, String tableName, DatabaseEntity dbe, String uuid, DataTableEntity dt, Map<String, String> fieldTypeReference, List<Map<String, Object>> resultList) {
|
getFieldName(CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD, queryDataService, tableName, dbe, uuid, dt, fieldTypeReference, resultList);
|
}
|
|
/**
|
* 获取表名
|
*
|
* @param queryDataService
|
* @param dbe
|
* @return
|
*/
|
public String[] getTableName(QueryDataService queryDataService, DatabaseEntity dbe) {
|
if (dbe != null && dbe.getDbType() != null) {
|
DatabaseType dbt = dbe.getDbType();
|
String sql = null;
|
if (DatabaseType.MySql.equals(dbt)) { //数据库名称
|
sql = String.format("select TABLE_NAME as table_name from information_schema.COLUMNS WHERE TABLE_SCHEMA = '%s' group by TABLE_NAME", dbe.getDbName());
|
} else if (DatabaseType.Oracle.equals(dbt)) { //用户名 分大小写
|
// sql = String.format("select TABLE_NAME as table_name from user_tab_columns GROUP BY TABLE_NAME ");
|
//oracle使用一次性查询方式获取表字段
|
return null;
|
} else if (DatabaseType.SqlServer.equals(dbt)) {
|
//数据库名称
|
sql = String.format("SELECT name as table_name FROM %s..SysObjects Where XType='U' ", dbe.getDbName());
|
} else if (DatabaseType.Informix.equals(dbt)) { //数据库名称
|
sql = String.format("SELECT tabname as table_name FROM \"informix\".systables WHERE tabtype = 'T' AND tabid >= 100 and owner ='%s'", dbe.getDbName());
|
} else if (DatabaseType.PSQL.equals(dbt)) {
|
//PSQL使用一次性查询方式获取表字段
|
return null;
|
}
|
if (sql != null && queryDataService != null) {
|
QueryResultEntity queryResult = queryDataService.getResult(sql);
|
List<Map<String, Object>> resultList = queryResult.getResult();
|
String[] tableName = new String[resultList.size()];
|
for (int i = 0; i < resultList.size(); i++) {
|
Object table_name = resultList.get(i).get("table_name");
|
if (table_name == null) {
|
table_name = resultList.get(i).get("table_name".toUpperCase());
|
}
|
tableName[i] = (String) table_name;
|
}
|
return tableName;
|
}
|
}
|
return new String[]{};
|
}
|
|
|
private void getSourceTableField(Connection connection, DatabaseType dbt, String uuid) {
|
String sql = null;
|
// if (DatabaseType.MySql.equals(dbt)) { //数据库名称
|
// sql = "select COLUMN_NAME from information_schema.COLUMNS WHERE TABLE_SCHEMA = '" + configField.getString(CmnConst.DATABASE_NAME) + "' AND TABLE_NAME = '" + tableName + "'";
|
// } else if (DatabaseType.Oracle.equals(dbt)) { //用户名 分大小写
|
// sql = "select COLUMN_NAME from all_tab_columns WHERE OWNER = '" + configField.getString(CmnConst.USER_NAME) + "' AND TABLE_NAME = '" + tableName + "'";
|
// } else if (DatabaseType.SqlServer.equals(dbt)) {
|
// //数据库名称
|
// sql = "SELECT name TABLE_NAME FROM " + fs.getString(CmnConst.DATABASE_NAME) + "..SysColumns WHERE id = ( " +
|
// " SELECT id FROM " + fs.getString(CmnConst.DATABASE_NAME) + "..SysObjects WHERE XType='U' and name = '" + tableName + "')";
|
// } else if (DatabaseType.Informix.equals(dbt)) { //数据库名称
|
// sql = " SELECT colname COLUMN_NAME from \"informix\".syscolumns where tabid = ( " +
|
// " SELECT tabid FROM \"informix\".systables WHERE tabname = '" + tableName + "'); ";
|
// }
|
}
|
}
|