package com.product.data.service; import cn.hutool.core.map.CaseInsensitiveMap; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.NumberUtil; import com.google.common.collect.Sets; import com.product.common.lang.StringUtils; import com.product.core.dao.BaseDao; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.service.support.AbstractBaseService; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.core.transfer.Transactional; import com.product.data.config.CmnConst; import com.product.data.config.DatabaseType; import com.product.data.config.ErrorCode; import com.product.data.connection.ConnectionManager; import com.product.data.entity.DatabaseEntity; import com.product.data.entity.QueryResultEntity; import com.product.data.service.impl.IRemoteService; import com.product.data.service.impl.ISyncDataConfigService; import com.product.data.utli.QueryDataService; import com.product.quartz.service.impl.SysJobService; import com.product.tool.table.service.DataModelService; import com.product.util.BaseUtil; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.quartz.SchedulerException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisSentinelPool; import java.sql.Connection; import java.util.*; /** * @Author cheng * @Date 2022/2/8 15:57 * @Desc 同步数据配置Service */ @Service public class SyncDataConfigService extends AbstractBaseService implements ISyncDataConfigService { @Autowired SysJobService sysJobService; @Autowired DataModelService dataModelService; @Autowired(required = false) IRemoteService remoteService; public FieldSetEntity findSyncTaskDetail(String uuid) { FieldSetEntity fse = getBaseDao().getFieldSetEntity(CmnConst.TABLE_SYNC_MANAGER_SUB, uuid, false); StringBuilder sql = new StringBuilder(); sql.append(" SELECT "); sql.append(" uuid, source_field,a.field_type field_tag,target_field,master_uuid,b.* "); sql.append(" FROM "); sql.append(" product_sys_data_sync_manager_field a "); sql.append(" JOIN ( "); sql.append(" SELECT "); sql.append(" `field`.uuid field_uuid, "); sql.append(" table_name, "); sql.append(" `table`.uuid table_uuid, "); sql.append(" field_name, "); sql.append(" field_description, "); sql.append(" field_show_name, "); sql.append(" field_type, "); sql.append(" field_unit, "); sql.append(" field_length, "); sql.append(" is_filter, "); sql.append(" field_reference "); sql.append(" FROM "); sql.append(" product_sys_datamodel_table `table` "); sql.append(" JOIN product_sys_datamodel_field `field` ON `table`.uuid = `field`.table_uuid "); sql.append(" ) b ON a.master_uuid=? and b.table_name=? and a.target_field = b.field_name "); DataTableEntity dt = getBaseDao().listTable(sql.toString(), new Object[]{fse.getUUID(), fse.getString(CmnConst.TARGET_TABLE)}); dt.getMeta().setTableName(new Object[]{CmnConst.TABLE_SYNC_MANAGER_FIELD}); fse.addSubDataTable(dt); fse.addSubDataTable(getBaseDao().listTable(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE_INDEX, "table_uuid=?", new Object[]{dt.getString(0, CmnConst.TABLE_UUID)})); return fse; } /** * 删除同步表配置 * * @param fse * @return * @throws BaseException */ @Transactional @Override public void delSyncTableConfig(FieldSetEntity fse) throws BaseException, SchedulerException { Object[] params = new Object[]{fse.getUUID()}; getBaseDao().delete(fse.getTableName(), params); //查询同步配置相关的定时任务并删除 FieldSetEntity timeTaskFieldSet = getBaseDao().getFieldSetEntityBySQL( "SELECT * FROM product_sys_timed_task WHERE uuid= (SELECT time_task_uuid from product_sys_data_sync_manager_sub where uuid =?)", params, false); if (timeTaskFieldSet != null) { sysJobService.deleteJob(timeTaskFieldSet); } } /** * 操作表 * * @param fse * @throws BaseException */ private FieldSetEntity operationTable(FieldSetEntity fse, boolean update) throws BaseException { FieldSetEntity table = new FieldSetEntity(); table.setTableName(CmnConst.TABLE_MODULE_TABLE); // 表名 table.setValue(CmnConst.TABLE_NAME, fse.getString(CmnConst.TARGET_TABLE)); // 表类型 table.setValue(CmnConst.TABLE_TYPE, 1); table.setValue("sequence", 1); table.setValue(CmnConst.TABLE_DESCRIPTION, "数据库同步表,来源表:" + fse.getString(CmnConst.SOURCE_TABLE)); table.addSubDataTable(fse.getSubDataTable(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE_INDEX)); DataTableEntity subDataTable = fse.getSubDataTable(CmnConst.TABLE_SYNC_MANAGER_FIELD); DataTableEntity field = new DataTableEntity(); List fieldName = new ArrayList<>(); boolean isPk = false; for (int i = 0; i < subDataTable.getRows(); i++) { if (i == 0) { table.setValue(CmnConst.UUID, subDataTable.getString(i, CmnConst.TABLE_UUID)); } String field_type = subDataTable.getString(i, CmnConst.FIELD_TYPE); //将 sync_manager_field 子表中 field_tag 还原到 field_type 中 subDataTable.setFieldValue(i, CmnConst.FIELD_TYPE, subDataTable.getString(i, "field_tag")); FieldSetEntity fs = subDataTable.getFieldSetEntity(i).clones(); fs.setValue(CmnConst.FIELD_TYPE, field_type); fs.setTableName(CmnConst.TABLE_MODULE_FIELD); // 将 field_uuid 写到uuid字段中 fs.setValue(CmnConst.UUID, fs.getString("field_uuid")); field.addFieldSetEntity(fs); fieldName.add(fs.getString(CmnConst.FIELD_NAME)); if ("pk".equalsIgnoreCase(fs.getString(CmnConst.FIELD_TYPE))) { isPk = true; } } if (!update) { if (fieldName.contains("uuid")) { throw new BaseException(ErrorCode.SYNC_FIELD_COLUMN_EXISTS_SAME_FIELD.getValue(), ErrorCode.SYNC_FIELD_COLUMN_EXISTS_SAME_FIELD.getText().replace("{{fieldName}}", "uuid")); } else { FieldSetEntity d = new FieldSetEntity(); d.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_FIELD); d.setValue(com.product.tool.table.config.CmnConst.FIELD_NAME, "uuid"); d.setValue("field_show_name", "唯一标识"); d.setValue(com.product.tool.table.config.CmnConst.FIELD_TYPE, "string"); d.setValue(com.product.tool.table.config.CmnConst.FIELD_LENGTH, 80); d.setValue(com.product.tool.table.config.CmnConst.FIELD_UNIT, 0); d.setValue(com.product.tool.table.config.CmnConst.IS_REQUIRED, 1); d.setValue(com.product.tool.table.config.CmnConst.FIELD_DESCRIPTION, "唯一标识"); field.addFieldSetEntity(d); d = new FieldSetEntity(); d.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE_INDEX); d.setValue("index_name", "uuid_unique_" + IdUtil.simpleUUID().substring(0, 5)); d.setValue("index_fields", "uuid"); d.setValue("index_type", 1); DataTableEntity indexSub = table.getSubDataTable(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE_INDEX); if (indexSub == null) { indexSub = new DataTableEntity(); } indexSub.addFieldSetEntity(d); table.addSubDataTable(indexSub); } if (!isPk) { String pkField = "id"; int i = 1; while (fieldName.contains(pkField)) { pkField = "id" + i; i++; } FieldSetEntity d = new FieldSetEntity(); d.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_FIELD); d.setValue(com.product.tool.table.config.CmnConst.FIELD_NAME, pkField); d.setValue("field_show_name", "自增主键"); d.setValue(com.product.tool.table.config.CmnConst.FIELD_TYPE, "pk"); d.setValue(com.product.tool.table.config.CmnConst.FIELD_LENGTH, 11); d.setValue(com.product.tool.table.config.CmnConst.IS_REQUIRED, 1); d.setValue(com.product.tool.table.config.CmnConst.FIELD_UNIT, 0); d.setValue(com.product.tool.table.config.CmnConst.FIELD_DESCRIPTION, "自增主键"); field.addFieldSetEntity(d); } } table.addSubDataTable(field); return table; } /** * 保存同步表配置 * * @param fse * @return * @throws BaseException */ @Transactional @Override public String saveSyncTableConfig(FieldSetEntity fse) throws BaseException { fse.setValue("query_type", 0); BaseUtil.createCreatorAndCreationTime(fse); boolean update = true; if (StringUtils.isEmpty(fse.getUUID())) { fse.setValue(CmnConst.UUID, IdUtil.randomUUID()); update = false; } DataTableEntity subDataTable = fse.getSubDataTable(CmnConst.TABLE_SYNC_MANAGER_FIELD); if (DataTableEntity.isEmpty(subDataTable)) { throw new BaseException(ErrorCode.SAVE_SYNC_TASK_FIELD_INFO_EMPTY); } FieldSetEntity tableData = operationTable(fse, update); //增量同步 if (1 == fse.getInteger("sync_type")) { //唯一字段 boolean unique_field = false; //增量标识字段 boolean incremental_field = false; boolean update_field = false; for (int i = 0; i < subDataTable.getRows(); i++) { String field_type = subDataTable.getString(i, CmnConst.FIELD_TYPE); if (!unique_field || !incremental_field) { List templateType = Arrays.asList(field_type.split(",")); if (templateType.contains("2")) { unique_field = true; } else if (templateType.contains("1") && templateType.contains("3")) { //增量标识和更新标识为同一字段 incremental_field = true; update_field = true; // throw new BaseException(ErrorCode.INCREMENTAL_UPDATE_CAN_NOT_SAME_FIELD); } else if (templateType.contains("1")) { incremental_field = true; } else if (templateType.contains("3")) { update_field = true; } } else { break; } } if (update_field && !unique_field) { throw new BaseException(ErrorCode.SYNC_DATA_UNIQUE_FIELD_NOT_EXIST); } if (!incremental_field) { throw new BaseException(ErrorCode.SYNC_DATA_INCREMENTAL_FIELD_NOT_EXIST); } } fse.removeSubData(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE_INDEX); try { //因为uuid是自己生成的(定时任务需要这个uuid)所以要判断 否则调用saveFieldSet 会执行更新 boolean clearTaskUuid = false; if (StringUtils.isEmpty(fse.getString("time_task_uuid"))) { // 先写入临时的(占位)定时任务表uuid 避免保存报错定时任务加入队列后无法删除(connection 被关闭) fse.setValue("time_task_uuid", "tempUuid"); clearTaskUuid = true; } if (update) { getBaseDao().saveFieldSetEntity(fse); } else { getBaseDao().add(fse); } if (clearTaskUuid) { fse.setValue("time_task_uuid", null); } if (update && !StringUtils.isEmpty(tableData.getUUID())) { FieldSetEntity uuidField = BaseUtil.getSingleInfoByCache("表字段信息", new String[]{tableData.getUUID(), "uuid"}); tableData.getSubData().get("product_sys_datamodel_field").addFieldSetEntity(uuidField); FieldSetEntity f = getBaseDao().getFieldSetEntityByFilter("product_sys_datamodel_table_index", "table_uuid =? and index_type=1 and index_fields ='uuid' ", new Object[]{tableData.getUUID()}, false); if (f != null && !StringUtils.isEmpty(f.getUUID())) { DataTableEntity indexSub = tableData.getSubDataTable(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE_INDEX); if (indexSub == null) { indexSub = new DataTableEntity(); } List fieldSetEntity = indexSub.getFieldSetEntity(f.getUUID()); if (fieldSetEntity == null || fieldSetEntity.size() <= 0) { indexSub.addFieldSetEntity(f); tableData.addSubDataTable(indexSub); } } } dataModelService.dataModelOperation(tableData); //创建同步定时任务 FieldSetEntity timeTaskFieldSet = createdTimeTask(fse.getString("target_table"), fse.getString("execute_time"), fse.getUUID(), fse.getString("time_task_uuid")); fse.getSubData().clear(); //回写定时任务uuid fse.setValue("time_task_uuid", timeTaskFieldSet.getUUID()); getBaseDao().saveFieldSetEntity(fse); } catch (Exception e) { e.printStackTrace(); throw e; } return fse.getUUID(); } /** * 保存同步连接配置 * * @param fse * @return * @throws BaseException */ @Transactional @Override public String saveSyncConnectionConfig(FieldSetEntity fse) throws BaseException { BaseUtil.createCreatorAndCreationTime(fse); getBaseDao().saveFieldSetEntity(fse); if (fse.getInteger("db_type") != null && fse.getInteger("db_type") == 5) { // redis 仅仅连接一哈 try { // System.out.println(fse.getString("ip")); // System.out.println(fse.getString("port")); // System.out.println(fse.getString("pass_word")); // System.out.println(fse.getString("instance")); Jedis jedis = new Jedis(fse.getString("ip"), fse.getInteger("port")); if (!StringUtils.isEmpty(fse.getString("pass_word"))) { jedis.auth(fse.getString("pass_word")); } int dbIndex = StringUtils.isEmpty(fse.getString("instance")) ? 0 : fse.getInteger("instance"); jedis.select(dbIndex); jedis.connect(); jedis.close(); } catch (Exception e) { throw new BaseException(ErrorCode.GET_CONNECTION_FAIL); } } else if (fse.getInteger("db_type") != null && fse.getInteger("db_type") == 6) { // Set sentinels = new HashSet<>(); // System.out.println(fse.getString("ip")); // System.out.println(fse.getString("port")); // System.out.println(fse.getString("pass_word")); // System.out.println(fse.getString("instance")); String[] ip = fse.getString("ip").split(","); // sentinels.add("127.0.0.1:26390"); // sentinels.add("127.0.0.1:26391"); // sentinels.add("127.0.0.1:26392"); GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig<>(); poolConfig.setMaxIdle(100); poolConfig.setMaxWaitMillis(10000); poolConfig.setTestOnBorrow(true); int connectionTimeout = 5000; int soTimeout = 5000; String password = null; int database = 0; try (JedisSentinelPool jsPool = new JedisSentinelPool(fse.getString("user_name"), Sets.newHashSet(ip), poolConfig, connectionTimeout, soTimeout, password, database); Jedis jedis = jsPool.getResource()) { jedis.connect(); } catch (Exception e) { throw new BaseException(ErrorCode.GET_CONNECTION_FAIL); } } else { DatabaseEntity databaseEntity = new DatabaseEntity(fse); this.addTask(databaseEntity, fse.getUUID()); } if (remoteService != null) { remoteService.saveSyncConnectionConfig(fse); } return fse.getUUID(); } public synchronized void addTableField(DatabaseEntity dbe, String connectionConfigUuid, String tableName) throws BaseException { try (Connection connection = ConnectionManager.getConnection(dbe)) { DataTableEntity dt = new DataTableEntity(); QueryDataService queryDataService = new QueryDataService(connection); getFieldName(queryDataService, tableName, dbe, connectionConfigUuid, dt, getFieldTypeReference(dbe.getDbType()), null); if (!DataTableEntity.isEmpty(dt)) { getBaseDao().add(dt); } } catch (Exception e) { throw new BaseException(e); } } /** * 获取来源表字段 * * @param dbe * @param uuid */ private synchronized void addTask(DatabaseEntity dbe, String uuid) { BaseDao baseDao = getBaseDao(); try (Connection connection = ConnectionManager.getConnection(dbe)) { if (connection != null) { QueryDataService queryDataService = new QueryDataService(connection); String[] tableName = getTableName(queryDataService, dbe); if ((tableName != null && tableName.length > 0) || DatabaseType.PSQL.equals(dbe.getDbType()) || DatabaseType.Oracle.equals(dbe.getDbType())) { Map fieldTypeReference = getFieldTypeReference(dbe.getDbType()); DataTableEntity dt = new DataTableEntity(); if (DatabaseType.Oracle.equals(dbe.getDbType())) { Map>> tableFieldMap = getOracleFieldInfo(queryDataService); tableFieldMap.forEach((k, v) -> { getFieldName(queryDataService, k, dbe, uuid, dt, fieldTypeReference, v); }); } else if (DatabaseType.PSQL.equals(dbe.getDbType())) { Map>> pSqlFieldInfo = getPSqlFieldInfo(dbe.getDbInstance(), queryDataService); pSqlFieldInfo.forEach((k, v) -> { getFieldName(queryDataService, k, dbe, uuid, dt, fieldTypeReference, v); }); } else { for (String table : tableName) { getFieldName(queryDataService, table, dbe, uuid, dt, fieldTypeReference, null); } } if (!DataTableEntity.isEmpty(dt)) { baseDao.delete(CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD, "sync_manager_uuid=?", new Object[]{uuid}); baseDao.add(dt); } } } } catch (BaseException e) { throw e; } catch (Exception e) { throw new BaseException(e); } } /** * 创建定时任务 * * @param target_table 目标表 * @param execute_time corn 表达式 * @param task_uuid 同步数据配置 uuid product_sys_data_sync_manager_sub * @param timeTaskUuid 定时任务uuid */ private FieldSetEntity createdTimeTask(String target_table, String execute_time, String task_uuid, String timeTaskUuid) throws BaseException { // FieldSetEntity fse = new FieldSetEntity(); try { fse.setTableName("product_sys_timed_task"); fse.setValue("uuid", timeTaskUuid); fse.setValue("job_name", target_table + "表数据同步");//任务名称 fse.setValue("job_group", "system");//分组 fse.setValue("invoke_target", "syncDataManager.executeDataSyncTask('" + task_uuid + "')");//调用目标字符串 fse.setValue("cron_expression", execute_time);//cron表达式 fse.setValue("misfire_policy", "3");//错误执行策略 只执行一次 fse.setValue("concurrent", 0);//不允许并发执行 fse.setValue("remark", target_table + "表数据同步"); fse.setValue("created_by", SpringMVCContextHolder.getCurrentUser().getUser_id()); fse.setValue("created_utc_datetime", new Date()); fse.setValue("status", 1); fse.setValue("is_conceal", 1); if (!StringUtils.isEmpty(timeTaskUuid)) { sysJobService.updateJob(fse); } else { sysJobService.insertJob(fse); } return fse; } catch (Exception e) { e.printStackTrace(); throw new BaseException(ErrorCode.CRATED_TIMED_TASK_FAIL); } // } /** * 保存字段 * * @param uuid * @param fieldMap * @return */ @Deprecated private DataTableEntity saveTableField(String uuid, Map fieldMap) { DataTableEntity dt = new DataTableEntity(); fieldMap.forEach((k, v) -> { for (String fieldName : v) { FieldSetEntity field = new FieldSetEntity(); field.setTableName(CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD); field.setValue("sync_manager_uuid", uuid); //表名 field.setValue("table_name", k); //字段名 field.setValue("field_name", fieldName); dt.addFieldSetEntity(field); } }); return dt; } private Map getFieldTypeReference(DatabaseType dbt) { Map fieldType = new CaseInsensitiveMap<>(); if (dbt == null) { return fieldType; } DataTableEntity dt = getBaseDao().listTable("product_sys_field_type_reference", "db_type=?", new Object[]{dbt.getValue()}); if (DataTableEntity.isEmpty(dt)) { return fieldType; } for (int i = 0; i < dt.getRows(); i++) { fieldType.put(dt.getString(i, "db_field_type").trim(), dt.getString(i, "sys_field_type").trim()); } return fieldType; } /** * oracle 字段元信息 * * @param queryDataService * @return */ private Map>> getOracleFieldInfo(QueryDataService queryDataService) { StringBuilder sql = new StringBuilder(); sql.append(" SELECT "); sql.append(" T .column_name AS column_name, ");//--列名 sql.append(" T .column_type AS column_type, ");//字段类型 sql.append(" T .data_length AS column_length, ");//字段长度 sql.append(" T .data_scale AS column_scale, ");//字段精度 sql.append(" T .column_comment AS column_comment, ");//字段注释 sql.append(" T .table_name AS table_name ");//字段注释 sql.append(" FROM "); sql.append(" ( "); sql.append(" SELECT "); sql.append(" UB.tablespace_name AS database_name, "); sql.append(" UTC.table_name AS table_name, "); sql.append(" UTC.column_name AS column_name, "); sql.append(" UTC.data_length AS data_length, "); sql.append(" UTC.data_type AS column_type, "); sql.append(" utc.data_scale AS data_scale, "); sql.append(" ucc.comments AS column_comment, "); sql.append(" utc.column_id, "); sql.append(" utc.nullable "); sql.append(" FROM "); sql.append(" user_tables ub "); sql.append(" LEFT JOIN user_tab_columns utc ON ub.table_name = UTC.table_name "); sql.append(" LEFT JOIN user_col_comments ucc ON utc.column_name = ucc.column_name "); sql.append(" AND utc.table_name = ucc.table_name "); sql.append(" ) T "); sql.append(" LEFT JOIN ( "); sql.append(" SELECT "); sql.append(" UCC.table_name AS table_name, "); sql.append(" ucc.column_name AS column_name, "); sql.append(" wm_concat (UC.constraint_type) AS constraint_type "); sql.append(" FROM "); sql.append(" user_cons_columns ucc "); sql.append(" LEFT JOIN user_constraints uc ON UCC.constraint_name = UC.constraint_name "); sql.append(" GROUP BY "); sql.append(" UCC.table_name, "); sql.append(" ucc.column_name "); sql.append(" ) b ON T .table_name = b.TABLE_NAME "); sql.append(" AND T .column_name = b.column_name "); // sql.append(" where T.table_name='" + tableName + "' "); QueryResultEntity queryResult = queryDataService.getResult(sql.toString()); List> resultList = queryResult.getResult(); Map>> result = new HashMap<>(); for (Map map : resultList) { String tableName = map.get("table_name") == null ? (String) map.get("table_name".toUpperCase()) : (String) map.get("table_name"); if (tableName == null || tableName.length() == 0) { continue; } List> table_name = result.get(tableName); if (table_name == null) { table_name = new ArrayList<>(); result.put(tableName, table_name); } table_name.add(map); } return result; } /** * Mysql 数据库 字段名称 类型 长度 注释 * * @param tableName * @param queryDataService * @return */ private List> getMysqlFieldInfo(String tableName, QueryDataService queryDataService) { StringBuilder sql = new StringBuilder(); sql.append(" SELECT "); sql.append(" COLUMN_NAME column_name, "); sql.append(" data_type column_type, "); sql.append(" column_type length_info, "); sql.append(" column_comment "); sql.append(" FROM "); sql.append(" INFORMATION_SCHEMA.COLUMNS "); sql.append(" WHERE "); sql.append(" TABLE_NAME = '" + tableName + "' "); sql.append(" AND TABLE_SCHEMA = (select database()) "); QueryResultEntity queryResult = queryDataService.getResult(sql.toString()); List> resultList = queryResult.getResult(); for (int i = 0; i < resultList.size(); i++) { Map map = resultList.get(i); String length_info = (String) map.get("length_info"); Object[] length = {0, 0}; if (length_info.indexOf("(") != -1) { length_info = length_info.substring(length_info.indexOf("(") + 1, length_info.indexOf(")")); if (length_info.indexOf(",") != -1) { //有小数 length = length_info.split(","); } else { length[0] = length_info; } } map.put("column_length", length[0]); map.put("column_scale", length[1]); } return resultList; } /** * sqlserver 数据库 字段名称 类型 长度 注释 * * @param tableName * @param queryDataService * @return */ private List> getSqlServerFieldInfo(String tableName, QueryDataService queryDataService) { StringBuilder sql = new StringBuilder(); sql.append(" SELECT c.name as column_name,t.name as column_type,isnull(c.prec ,0)column_length,isnull(c.scale,0) column_scale "); sql.append(" FROM sysobjects o "); sql.append(" JOIN syscolumns c on o.id=c.id "); sql.append(" JOIN systypes t on c.xusertype=t.xusertype where o.type ='u' and o.name='" + tableName + "' "); QueryResultEntity queryResult = queryDataService.getResult(sql.toString()); List> resultList = queryResult.getResult(); return resultList; } /** * 查询INFORMIX数据库 字段名称 类型 长度 注释 * * @param tableName * @param queryDataService * @return */ private List> getInformixFieldInfo(String dbName, String tableName, QueryDataService queryDataService) { StringBuilder sql = new StringBuilder(); sql.append(" select c.colname column_name, "); sql.append(" case c.coltype "); sql.append(" when '0' then 'CHAR' "); sql.append(" when '1' then 'SMALLINT' "); sql.append(" when '2' then 'INTEGER' "); sql.append(" when '3' then 'FLOAT' "); sql.append(" when '4' then 'SMALLFLOAT' "); sql.append(" when '5' then 'DECIMAL' "); sql.append(" when '6' then 'SERIAL' "); sql.append(" when '7' then 'DATE' "); sql.append(" when '8' then 'MONEY' "); sql.append(" when '9' then 'NULL' "); sql.append(" when '10' then 'DATETIME' "); sql.append(" when '11' then 'BYTE' "); sql.append(" when '12' then 'TEXT' "); sql.append(" when '13' then 'VARCHAR' "); sql.append(" when '14' then 'INTERVAL' "); sql.append(" when '15' then 'NCHAR' "); sql.append(" when '16' then 'NVARCHAR' "); sql.append(" when '17' then 'INT8' "); sql.append(" when '18' then 'SERIAL8' "); sql.append(" when '19' then 'SET' "); sql.append(" when '20' then 'MULTISET' "); sql.append(" when '21' then 'LIST' "); sql.append(" when '22' then 'Unnamed ROW' "); sql.append(" when '40' then 'LVARCHAR' "); sql.append(" when '41' then 'CLOB' "); sql.append(" when '43' then 'BLOB' "); sql.append(" when '44' then 'BOOLEAN' "); sql.append(" when '256' then 'CHAR' "); sql.append(" when '257' then 'SMALLINT' "); sql.append(" when '258' then 'INTEGER' "); sql.append(" when '259' then 'FLOAT' "); sql.append(" when '260' then 'REAL' "); sql.append(" when '261' then 'DECIMAL' "); sql.append(" when '262' then 'SERIAL' "); sql.append(" when '263' then 'DATE' "); sql.append(" when '264' then 'MONEY' "); sql.append(" when '266' then 'DATETIME' "); sql.append(" when '267' then 'BYTE' "); sql.append(" when '268' then 'TEXT' "); sql.append(" when '269' then 'VARCHAR' "); sql.append(" when '270' then 'INTERVAL' "); sql.append(" when '271' then 'NCHAR' "); sql.append(" when '272' then 'NVARCHAR' "); sql.append(" when '273' then 'INT8' "); sql.append(" when '274' then 'SERIAL8' "); sql.append(" when '275' then 'SET' "); sql.append(" when '276' then 'MULTISET' "); sql.append(" when '277' then 'LIST' "); sql.append(" when '278' then 'Unnamed ROW' "); sql.append(" when '296' then 'LVARCHAR' "); sql.append(" when '297' then 'CLOB' "); sql.append(" when '298' then 'BLOB' "); sql.append(" when '299' then 'BOOLEAN' "); sql.append(" when '4118' then 'Named ROW' "); sql.append(" end column_type,c.collength column_length "); sql.append(" from ( "); sql.append(" select * from systables "); sql.append(" where tabtype='T' and owner =? and tabname=? "); sql.append(" ) t "); sql.append(" left join syscolumns c on t.tabid=c.tabid "); QueryResultEntity queryResult = queryDataService.getResult(sql.toString(), new Object[]{dbName, tableName}, null); List> result = queryResult.getResult(); for (int i = 0; i < result.size(); i++) { Map fieldInfo = new CaseInsensitiveMap<>(result.get(i)); result.set(i, fieldInfo); String columnType = (String) fieldInfo.get("column_type"); if (columnType != null) { columnType = columnType.trim(); fieldInfo.put("column_type", columnType); } // if ("prppmain".equalsIgnoreCase(tableName) && "paytimes".equalsIgnoreCase((String) fieldInfo.get("column_name"))) { System.out.println(1); } //日期处理 if ("DATETIME".equalsIgnoreCase(columnType) || "DATE".equalsIgnoreCase(columnType)) { fieldInfo.put("column_length", 0); } else if ("DECIMAL".equalsIgnoreCase(columnType) || "FLOAT".equalsIgnoreCase(columnType) || "SMALLFLOAT".equalsIgnoreCase(columnType) || "MONEY".equalsIgnoreCase(columnType)) { if (fieldInfo.get("column_length") != null) { fieldInfo.put("column_length", fieldInfo.get("column_length").toString()); } String columnLength = (String) fieldInfo.get("column_length"); if (!NumberUtil.isNumber(columnLength)) { fieldInfo.put("column_length", 11); fieldInfo.put("column_scale", 2); continue; } else { int colLength = NumberUtil.parseInt(columnLength); int column_length = colLength >> 8; int column_scale = colLength & 255; if (column_scale > 30) { column_scale = 30; } if (column_length < column_scale) { column_length = column_scale; } fieldInfo.put("column_length", column_length); fieldInfo.put("column_scale", column_scale); } } else { Object columnLength = fieldInfo.get("column_length"); if (columnLength != null && NumberUtil.parseInt(columnLength.toString()) > 8000) { fieldInfo.put("column_length", 8000); } fieldInfo.put("column_scale", 0); } } return result; } /** * 查询PSQL数据库 字段名称 类型 长度 注释 * * @param queryDataService * @return */ private Map>> getPSqlFieldInfo(String instance, QueryDataService queryDataService) { StringBuilder sql = new StringBuilder(); sql.append(" SELECT case when column_length<0 then 0 else column_length end as column_length,table_name,\"column_name\",column_type,column_comment FROM ( "); sql.append(" SELECT C "); sql.append(" .relname AS \"table_name\", "); sql.append(" A.attname AS \"column_name\", "); sql.append(" ( CASE WHEN A.attlen > 0 THEN A.attlen ELSE A.atttypmod - 4 END ) AS \"column_length\", "); sql.append(" replace(format_type ( A.atttypid, A.atttypmod ),substring(format_type ( A.atttypid, A.atttypmod ),\"position\"(format_type ( A.atttypid, A.atttypmod ),'('),\"position\"" + "(format_type ( A.atttypid, A.atttypmod ),')')),'') AS \"column_type\", "); sql.append(" d.description AS \"column_comment\" "); sql.append(" FROM "); sql.append(" pg_attribute "); sql.append(" A LEFT JOIN pg_description d ON d.objoid = A.attrelid "); sql.append(" AND d.objsubid = A.attnum "); sql.append(" LEFT JOIN pg_class C ON A.attrelid = C.oid "); sql.append(" LEFT JOIN pg_type T ON A.atttypid = T.oid "); sql.append(" WHERE "); sql.append(" A.attnum >= 0 "); sql.append(" AND C.relname IN ( SELECT \"tablename\" FROM pg_tables WHERE schemaname = ? ) "); sql.append(" ) A "); QueryResultEntity queryResult = queryDataService.getResult(sql.toString(), new Object[]{instance}, null); List> resultList = queryResult.getResult(); Map>> result = new HashMap<>(); for (int i = 0; i < resultList.size(); i++) { Map fieldInfo = new CaseInsensitiveMap<>(resultList.get(i)); resultList.set(i, fieldInfo); String column_type = (String) fieldInfo.get("column_type"); if (fieldInfo.get("column_length") == null) { fieldInfo.put("column_length", 20); } String tableName = (String) fieldInfo.get("table_name"); if (StringUtils.isEmpty(tableName)) { continue; } //小数需要转换 if (column_type.toLowerCase().indexOf("numeric") != -1 && NumberUtil.isNumber(fieldInfo.get("column_length").toString())) { int columnLength = NumberUtil.parseInt(fieldInfo.get("column_length").toString()); int column_length = columnLength >> 16; int column_scale = columnLength & 255; if (column_scale > 30) { column_scale = 30; } if (column_length < column_scale) { column_length = column_scale; } fieldInfo.put("column_length", column_length); fieldInfo.put("column_scale", column_scale); } else if ("date".equalsIgnoreCase(column_type)) { //日期 fieldInfo.put("column_length", 0); fieldInfo.put("column_scale", 0); } else if ("real".equalsIgnoreCase(column_type)) { //可变精度 将小数 和长度都设为一致 PSQL 数据库 float 最大支持 15位 MYSQL 30位 fieldInfo.put("column_length", fieldInfo.get("column_length")); fieldInfo.put("column_scale", fieldInfo.get("column_length")); } else if ("text".equalsIgnoreCase(column_type)) { fieldInfo.put("column_length", 8000); } else { fieldInfo.put("column_scale", 0); } List> table_name = result.get(tableName); if (table_name == null) { table_name = new ArrayList<>(); result.put(tableName, table_name); } table_name.add(fieldInfo); } return result; } /** * 获取字段名称 * * @param queryDataService * @param tableName * @param dbe * @return */ private void getFieldName(QueryDataService queryDataService, String tableName, DatabaseEntity dbe, String uuid, DataTableEntity dt, Map fieldTypeReference, List> resultList) { DatabaseType dbt = dbe.getDbType(); if (DatabaseType.MySql.equals(dbt)) { resultList = getMysqlFieldInfo(tableName, queryDataService); } else if (DatabaseType.Oracle.equals(dbt)) { // resultList = getOracleFieldInfoSql(tableName, queryDataService); } else if (DatabaseType.SqlServer.equals(dbt)) { resultList = getSqlServerFieldInfo(tableName, queryDataService); } else if (DatabaseType.Informix.equals(dbt)) { // QueryResultEntity queryResult = queryDataService.getResult(" SELECT colname COLUMN_NAME from \"informix\".syscolumns where tabid = ( " + // String.format(" SELECT tabid FROM \"informix\".systables WHERE tabname = '%s') ", tableName)); resultList = getInformixFieldInfo(dbe.getDbName(), tableName, queryDataService); } else if (DatabaseType.PSQL.equals(dbt)) { } else { return; } if (resultList != null) { for (int i = 0; i < resultList.size(); i++) { // 字段名 Object column_mame = resultList.get(i).get("column_name"); if (column_mame == null) { column_mame = resultList.get(i).get("column_name".toUpperCase()); } // 注释 Object column_comment = resultList.get(i).get("column_comment"); if (column_comment == null) { column_comment = resultList.get(i).get("column_comment".toUpperCase()); } // 类型 Object column_type = resultList.get(i).get("column_type"); if (column_type == null) { column_type = resultList.get(i).get("column_type".toUpperCase()); } //长度 Object column_length = resultList.get(i).get("column_length"); if (column_length == null) { column_length = resultList.get(i).get("column_length".toUpperCase()); } // 精度 Object column_scale = resultList.get(i).get("column_scale"); if (column_scale == null) { column_scale = resultList.get(i).get("column_scale".toUpperCase()); } FieldSetEntity field = new FieldSetEntity(); field.setTableName(CmnConst.PRODUCT_SYS_DATABASE_TABLENAME_FIELD); field.setValue("sync_manager_uuid", uuid); //表名 field.setValue("table_name", tableName.toLowerCase()); //字段名 field.setValue("field_name", column_mame.toString().toLowerCase()); //字段描述 field.setValue("field_desc", column_comment); //字段长度 field.setValue("field_length", "text".equalsIgnoreCase(column_type != null ? column_type.toString() : "") ? 0 : column_length); //字段精度 field.setValue("field_unit", column_scale); //字段类型 String field_type = fieldTypeReference == null ? null : fieldTypeReference.get(column_type); if (field_type == null) { System.out.println(field_type); } field.setValue(CmnConst.FIELD_TYPE, field_type == null ? "string" : field_type); dt.addFieldSetEntity(field); } } } /** * 获取表名 * * @param queryDataService * @param dbe * @return */ private String[] getTableName(QueryDataService queryDataService, DatabaseEntity dbe) { if (dbe != null && dbe.getDbType() != null) { DatabaseType dbt = dbe.getDbType(); String sql = null; if (DatabaseType.MySql.equals(dbt)) { //数据库名称 sql = String.format("select TABLE_NAME as table_name from information_schema.COLUMNS WHERE TABLE_SCHEMA = '%s' group by TABLE_NAME", dbe.getDbName()); } else if (DatabaseType.Oracle.equals(dbt)) { //用户名 分大小写 // sql = String.format("select TABLE_NAME as table_name from user_tab_columns GROUP BY TABLE_NAME "); //oracle使用一次性查询方式获取表字段 return null; } else if (DatabaseType.SqlServer.equals(dbt)) { //数据库名称 sql = String.format("SELECT name as table_name FROM %s..SysObjects Where XType='U' ", dbe.getDbName()); } else if (DatabaseType.Informix.equals(dbt)) { //数据库名称 sql = String.format("SELECT tabname as table_name FROM \"informix\".systables WHERE tabtype = 'T' AND tabid >= 100 and owner ='%s'", dbe.getDbName()); } else if (DatabaseType.PSQL.equals(dbt)) { //PSQL使用一次性查询方式获取表字段 return null; } if (sql != null && queryDataService != null) { QueryResultEntity queryResult = queryDataService.getResult(sql); List> resultList = queryResult.getResult(); String[] tableName = new String[resultList.size()]; for (int i = 0; i < resultList.size(); i++) { Object table_name = resultList.get(i).get("table_name"); if (table_name == null) { table_name = resultList.get(i).get("table_name".toUpperCase()); } tableName[i] = (String) table_name; } return tableName; } } return new String[]{}; } private void getSourceTableField(Connection connection, DatabaseType dbt, String uuid) { String sql = null; // if (DatabaseType.MySql.equals(dbt)) { //数据库名称 // sql = "select COLUMN_NAME from information_schema.COLUMNS WHERE TABLE_SCHEMA = '" + configField.getString(CmnConst.DATABASE_NAME) + "' AND TABLE_NAME = '" + tableName + "'"; // } else if (DatabaseType.Oracle.equals(dbt)) { //用户名 分大小写 // sql = "select COLUMN_NAME from all_tab_columns WHERE OWNER = '" + configField.getString(CmnConst.USER_NAME) + "' AND TABLE_NAME = '" + tableName + "'"; // } else if (DatabaseType.SqlServer.equals(dbt)) { // //数据库名称 // sql = "SELECT name TABLE_NAME FROM " + fs.getString(CmnConst.DATABASE_NAME) + "..SysColumns WHERE id = ( " + // " SELECT id FROM " + fs.getString(CmnConst.DATABASE_NAME) + "..SysObjects WHERE XType='U' and name = '" + tableName + "')"; // } else if (DatabaseType.Informix.equals(dbt)) { //数据库名称 // sql = " SELECT colname COLUMN_NAME from \"informix\".syscolumns where tabid = ( " + // " SELECT tabid FROM \"informix\".systables WHERE tabname = '" + tableName + "'); "; // } } }