zoukankan      html  css  js  c++  java
  • 使用触发器和C#程序实现数据同步

    项目中用到了Quartz,也算是Quartz的一个示例。

    源库创建数据同步表SYNC_DATA:

    CREATE TABLE SYNC_DATA(
        ID                             VARCHAR2(36)      NOT NULL,
        TABLE_NAME                     VARCHAR2(50)      NOT NULL,
        COMMAND                        VARCHAR2(10)      NOT NULL,
        TABLE_KEY                      VARCHAR2(50)      NOT NULL,
        DATA_ID                        VARCHAR2(50)      NOT NULL,
        FAIL_COUNT                     INT               NOT NULL,
        CREATE_TIME                    DATE              NOT NULL,
        UPDATE_TIME                    DATE ,
        CONSTRAINT sync_data_PK PRIMARY KEY (ID)
    )
    ;
    
    COMMENT ON TABLE SYNC_DATA IS '业务数据同步表'
    ;
    COMMENT ON COLUMN SYNC_DATA.ID IS '主键'
    ;
    COMMENT ON COLUMN SYNC_DATA.TABLE_NAME IS '表名'
    ;
    COMMENT ON COLUMN SYNC_DATA.COMMAND IS '命令'
    ;
    COMMENT ON COLUMN SYNC_DATA.TABLE_KEY IS '表主键'
    ;
    COMMENT ON COLUMN SYNC_DATA.DATA_ID IS '数据ID'
    ;
    COMMENT ON COLUMN SYNC_DATA.FAIL_COUNT IS '失败次数'
    ;
    COMMENT ON COLUMN SYNC_DATA.CREATE_TIME IS '创建时间'
    ;
    COMMENT ON COLUMN SYNC_DATA.UPDATE_TIME IS '更新时间'
    ;
    View Code

    新增修改删除触发器示例(oracle版):

    CREATE OR REPLACE TRIGGER "T_PT_CAMERA_INFO_I"
    AFTER INSERT ON "SHINY"."PT_CAMERA_INFO" FOR EACH ROW
    BEGIN
        INSERT INTO sync_data(ID, TABLE_NAME, COMMAND, TABLE_KEY, DATA_ID, FAIL_COUNT, CREATE_TIME)
        VALUES(sys_guid(), 'PT_CAMERA_INFO', 'INSERT',  'ID', :new.ID || '', 0,  sysdate);
    END;
    
    CREATE OR REPLACE TRIGGER "T_PT_CAMERA_INFO_U"
    AFTER UPDATE ON "SHINY"."PT_CAMERA_INFO" FOR EACH ROW
    BEGIN
        IF (NVL(to_char(:new.ID), 'X') != NVL(to_char(:old.ID), 'X'))
            OR (NVL(to_char(:new.ASSET_ID), 'X') != NVL(to_char(:old.ASSET_ID), 'X'))
            OR (NVL(to_char(:new.CAMERA_NO), 'X') != NVL(to_char(:old.CAMERA_NO), 'X'))
            OR (NVL(to_char(:new.POSITION_CODE), 'X') != NVL(to_char(:old.POSITION_CODE), 'X'))
            OR (NVL(to_char(:new.POSITION_ID), 'X') != NVL(to_char(:old.POSITION_ID), 'X'))
            OR (NVL(to_char(:new.CAMERA_NAME), 'X') != NVL(to_char(:old.CAMERA_NAME), 'X'))
            OR (NVL(to_char(:new.ORG_ID), 'X') != NVL(to_char(:old.ORG_ID), 'X'))
            OR (NVL(to_char(:new.ADDRESS), 'X') != NVL(to_char(:old.ADDRESS), 'X'))
            OR (NVL(to_char(:new.CAMERA_IP), 'X') != NVL(to_char(:old.CAMERA_IP), 'X'))
            OR (NVL(to_char(:new.LONGITUDE), 'X') != NVL(to_char(:old.LONGITUDE), 'X'))
            OR (NVL(to_char(:new.LATITUDE), 'X') != NVL(to_char(:old.LATITUDE), 'X'))
            OR (NVL(to_char(:new.SN), 'X') != NVL(to_char(:old.SN), 'X'))
            OR (NVL(to_char(:new.CAMERA_MODEL), 'X') != NVL(to_char(:old.CAMERA_MODEL), 'X'))
            OR (NVL(to_char(:new.MANUFACTURER), 'X') != NVL(to_char(:old.MANUFACTURER), 'X'))
            OR (NVL(to_char(:new.SUPPLIER), 'X') != NVL(to_char(:old.SUPPLIER), 'X'))
            OR (NVL(to_char(:new.ADD_ID), 'X') != NVL(to_char(:old.ADD_ID), 'X'))
            OR (NVL(to_char(:new.ADD_TIME, 'yyyy-mm-dd hh24:mi:ss'),'X') != NVL(to_char(:old.ADD_TIME, 'yyyy-mm-dd hh24:mi:ss'),'X'))
            OR (NVL(to_char(:new.MODIFY_ID), 'X') != NVL(to_char(:old.MODIFY_ID), 'X'))
            OR (NVL(to_char(:new.MODIFY_TIME, 'yyyy-mm-dd hh24:mi:ss'),'X') != NVL(to_char(:old.MODIFY_TIME, 'yyyy-mm-dd hh24:mi:ss'),'X'))
            OR (NVL(to_char(:new.USER_NAME), 'X') != NVL(to_char(:old.USER_NAME), 'X'))
            OR (NVL(to_char(:new.PASS_WORD), 'X') != NVL(to_char(:old.PASS_WORD), 'X'))
            OR (NVL(to_char(:new.CASE_INSTALL_TYPE), 'X') != NVL(to_char(:old.CASE_INSTALL_TYPE), 'X'))
            OR (NVL(to_char(:new.AREA_CODE), 'X') != NVL(to_char(:old.AREA_CODE), 'X'))
            OR (NVL(to_char(:new.MONITOR_TYPE), 'X') != NVL(to_char(:old.MONITOR_TYPE), 'X'))
            OR (NVL(to_char(:new.POINT_NAME), 'X') != NVL(to_char(:old.POINT_NAME), 'X'))
            OR (NVL(to_char(:new.CAMERA_IP6), 'X') != NVL(to_char(:old.CAMERA_IP6), 'X'))
            OR (NVL(to_char(:new.SUBNET_MASK), 'X') != NVL(to_char(:old.SUBNET_MASK), 'X'))
            OR (NVL(to_char(:new.GATEWAY), 'X') != NVL(to_char(:old.GATEWAY), 'X'))
            OR (NVL(to_char(:new.MAC_ADDRESS), 'X') != NVL(to_char(:old.MAC_ADDRESS), 'X'))
            OR (NVL(to_char(:new.ONU_SN), 'X') != NVL(to_char(:old.ONU_SN), 'X'))
            OR (NVL(to_char(:new.CAMERA_TYPE), 'X') != NVL(to_char(:old.CAMERA_TYPE), 'X'))
            OR (NVL(to_char(:new.CAMERA_FUN_TYPE), 'X') != NVL(to_char(:old.CAMERA_FUN_TYPE), 'X'))
            OR (NVL(to_char(:new.FILL_LIGTH_ATTR), 'X') != NVL(to_char(:old.FILL_LIGTH_ATTR), 'X'))
            OR (NVL(to_char(:new.CAMERA_ENCODE_TYPE), 'X') != NVL(to_char(:old.CAMERA_ENCODE_TYPE), 'X'))
            OR (NVL(to_char(:new.POWER_TAKE_TYPE), 'X') != NVL(to_char(:old.POWER_TAKE_TYPE), 'X'))
            OR (NVL(to_char(:new.POWER_TAKE_LENGTH), 'X') != NVL(to_char(:old.POWER_TAKE_LENGTH), 'X'))
            OR (NVL(to_char(:new.SOUND_ALARM), 'X') != NVL(to_char(:old.SOUND_ALARM), 'X'))
            OR (NVL(to_char(:new.RESOLUTION), 'X') != NVL(to_char(:old.RESOLUTION), 'X'))
            OR (NVL(to_char(:new.SOFT_VERSION), 'X') != NVL(to_char(:old.SOFT_VERSION), 'X'))
            OR (NVL(to_char(:new.LENS_PARAM), 'X') != NVL(to_char(:old.LENS_PARAM), 'X'))
            OR (NVL(to_char(:new.IS_HAVE_CONSOLE), 'X') != NVL(to_char(:old.IS_HAVE_CONSOLE), 'X'))
            OR (NVL(to_char(:new.INSTALL_WAY), 'X') != NVL(to_char(:old.INSTALL_WAY), 'X'))
            OR (NVL(to_char(:new.LINEAR_WAY), 'X') != NVL(to_char(:old.LINEAR_WAY), 'X'))
            OR (NVL(to_char(:new.RESOURCE_PLACE), 'X') != NVL(to_char(:old.RESOURCE_PLACE), 'X'))
            OR (NVL(to_char(:new.IMPORT_WATCH), 'X') != NVL(to_char(:old.IMPORT_WATCH), 'X'))
            OR (NVL(to_char(:new.POSITION_TYPE), 'X') != NVL(to_char(:old.POSITION_TYPE), 'X'))
            OR (NVL(to_char(:new.COMMUNITY), 'X') != NVL(to_char(:old.COMMUNITY), 'X'))
            OR (NVL(to_char(:new.STREET), 'X') != NVL(to_char(:old.STREET), 'X'))
            OR (NVL(to_char(:new.WATCH_SPEC_LOCATION), 'X') != NVL(to_char(:old.WATCH_SPEC_LOCATION), 'X'))
            OR (NVL(to_char(:new.ROAD_DIRECTION), 'X') != NVL(to_char(:old.ROAD_DIRECTION), 'X'))
            OR (NVL(to_char(:new.FOUL_LINE), 'X') != NVL(to_char(:old.FOUL_LINE), 'X'))
            OR (NVL(to_char(:new.FEN_JU), 'X') != NVL(to_char(:old.FEN_JU), 'X'))
            OR (NVL(to_char(:new.POLICE_STATION), 'X') != NVL(to_char(:old.POLICE_STATION), 'X'))
            OR (NVL(to_char(:new.CAMERA_DIRECTION), 'X') != NVL(to_char(:old.CAMERA_DIRECTION), 'X'))
            OR (NVL(to_char(:new.INSTALL_HEIGHT), 'X') != NVL(to_char(:old.INSTALL_HEIGHT), 'X'))
            OR (NVL(to_char(:new.CROSS_ARM1), 'X') != NVL(to_char(:old.CROSS_ARM1), 'X'))
            OR (NVL(to_char(:new.CROSS_ARM2), 'X') != NVL(to_char(:old.CROSS_ARM2), 'X'))
            OR (NVL(to_char(:new.INDOOR_OR_NOT), 'X') != NVL(to_char(:old.INDOOR_OR_NOT), 'X'))
            OR (NVL(to_char(:new.SPECIAL_PHOTO_PATH), 'X') != NVL(to_char(:old.SPECIAL_PHOTO_PATH), 'X'))
            OR (NVL(to_char(:new.LOCATION_PHOTO_PATH), 'X') != NVL(to_char(:old.LOCATION_PHOTO_PATH), 'X'))
            OR (NVL(to_char(:new.REAL_PHOTO_PATH), 'X') != NVL(to_char(:old.REAL_PHOTO_PATH), 'X'))
            OR (NVL(to_char(:new.NETWORK_PROPERTIES), 'X') != NVL(to_char(:old.NETWORK_PROPERTIES), 'X'))
            OR (NVL(to_char(:new.POLICE_AREA_CODE), 'X') != NVL(to_char(:old.POLICE_AREA_CODE), 'X'))
            OR (NVL(to_char(:new.INSTALL_PERSION), 'X') != NVL(to_char(:old.INSTALL_PERSION), 'X'))
            OR (NVL(to_char(:new.INSTALL_TIME, 'yyyy-mm-dd hh24:mi:ss'),'X') != NVL(to_char(:old.INSTALL_TIME, 'yyyy-mm-dd hh24:mi:ss'),'X'))
            OR (NVL(to_char(:new.BUILD_PERIOD), 'X') != NVL(to_char(:old.BUILD_PERIOD), 'X'))
            OR (NVL(to_char(:new.PROJECT_NAME), 'X') != NVL(to_char(:old.PROJECT_NAME), 'X'))
            OR (NVL(to_char(:new.MANAGER_UNIT), 'X') != NVL(to_char(:old.MANAGER_UNIT), 'X'))
            OR (NVL(to_char(:new.MANAGER_UNIT_TEL), 'X') != NVL(to_char(:old.MANAGER_UNIT_TEL), 'X'))
            OR (NVL(to_char(:new.MAINTAIN_UNIT), 'X') != NVL(to_char(:old.MAINTAIN_UNIT), 'X'))
            OR (NVL(to_char(:new.MAINTAIN_UNIT_TEL), 'X') != NVL(to_char(:old.MAINTAIN_UNIT_TEL), 'X'))
            OR (NVL(to_char(:new.RECODE_SAVE_TYPE), 'X') != NVL(to_char(:old.RECODE_SAVE_TYPE), 'X'))
            OR (NVL(to_char(:new.DEVICE_STATE), 'X') != NVL(to_char(:old.DEVICE_STATE), 'X'))
            OR (NVL(to_char(:new.INDUSTRY_OWN), 'X') != NVL(to_char(:old.INDUSTRY_OWN), 'X'))
            OR (NVL(to_char(:new.IS_REGISTER_IMOS), 'X') != NVL(to_char(:old.IS_REGISTER_IMOS), 'X'))
            OR (NVL(to_char(:new.IS_WIFI), 'X') != NVL(to_char(:old.IS_WIFI), 'X'))
            OR (NVL(to_char(:new.IS_FLASH), 'X') != NVL(to_char(:old.IS_FLASH), 'X'))
            OR (NVL(to_char(:new.CAMERA_NO_STR), 'X') != NVL(to_char(:old.CAMERA_NO_STR), 'X'))
            OR (NVL(to_char(:new.CAMERA_VCN_CODE), 'X') != NVL(to_char(:old.CAMERA_VCN_CODE), 'X'))
            OR (NVL(to_char(:new.FIELD_NO), 'X') != NVL(to_char(:old.FIELD_NO), 'X'))
            OR (NVL(to_char(:new.KEY_UNIT), 'X') != NVL(to_char(:old.KEY_UNIT), 'X'))
            OR (NVL(to_char(:new.UNIT_TYPE), 'X') != NVL(to_char(:old.UNIT_TYPE), 'X'))
            OR (NVL(to_char(:new.SHOW_LEVEL), 'X') != NVL(to_char(:old.SHOW_LEVEL), 'X'))
            OR (NVL(to_char(:new.PROTOCOL_TYPE), 'X') != NVL(to_char(:old.PROTOCOL_TYPE), 'X'))
            OR (NVL(to_char(:new.CAMERA_PORT), 'X') != NVL(to_char(:old.CAMERA_PORT), 'X'))
            OR (NVL(to_char(:new.INTERFACE_TYPE), 'X') != NVL(to_char(:old.INTERFACE_TYPE), 'X'))
            OR (NVL(to_char(:new.CHANNEL), 'X') != NVL(to_char(:old.CHANNEL), 'X'))
            OR (NVL(to_char(:new.USER_OBJECT), 'X') != NVL(to_char(:old.USER_OBJECT), 'X'))
            OR (NVL(to_char(:new.IMG_PATH), 'X') != NVL(to_char(:old.IMG_PATH), 'X'))
            OR (NVL(to_char(:new.CAMERA_DESC), 'X') != NVL(to_char(:old.CAMERA_DESC), 'X'))
            OR (NVL(to_char(:new.IS_REGISTER_VCN), 'X') != NVL(to_char(:old.IS_REGISTER_VCN), 'X'))
            OR (NVL(to_char(:new.IS_DEL), 'X') != NVL(to_char(:old.IS_DEL), 'X'))
            OR (NVL(to_char(:new.ORDER_VALUE), 'X') != NVL(to_char(:old.ORDER_VALUE), 'X'))
            OR (NVL(to_char(:new.POLLING_RESULT), 'X') != NVL(to_char(:old.POLLING_RESULT), 'X'))
            OR (NVL(to_char(:new.SERVER_ID), 'X') != NVL(to_char(:old.SERVER_ID), 'X'))
            OR (NVL(to_char(:new.SHORT_MSG), 'X') != NVL(to_char(:old.SHORT_MSG), 'X'))
            OR (NVL(to_char(:new.CAMERA_BELONGS_ID), 'X') != NVL(to_char(:old.CAMERA_BELONGS_ID), 'X'))
            OR (NVL(to_char(:new.RELATED_CUSTOMS), 'X') != NVL(to_char(:old.RELATED_CUSTOMS), 'X'))
            OR (NVL(to_char(:new.ADDED_TO_SDE), 'X') != NVL(to_char(:old.ADDED_TO_SDE), 'X'))
            OR (NVL(to_char(:new.CAMERA_BAK), 'X') != NVL(to_char(:old.CAMERA_BAK), 'X'))
            OR (NVL(to_char(:new.CAMERA_BELONGS_PK), 'X') != NVL(to_char(:old.CAMERA_BELONGS_PK), 'X'))
            OR (NVL(to_char(:new.MEMBERBAR_CODE), 'X') != NVL(to_char(:old.MEMBERBAR_CODE), 'X'))
            OR (NVL(to_char(:new.IS_BRANCH), 'X') != NVL(to_char(:old.IS_BRANCH), 'X'))
            OR (NVL(to_char(:new.IS_WATCHPOS), 'X') != NVL(to_char(:old.IS_WATCHPOS), 'X'))
            OR (NVL(to_char(:new.CAMERA_ANGLE), 'X') != NVL(to_char(:old.CAMERA_ANGLE), 'X'))
            OR (NVL(to_char(:new.CROSS_ARM3), 'X') != NVL(to_char(:old.CROSS_ARM3), 'X'))
            OR (NVL(to_char(:new.IS_SYS), 'X') != NVL(to_char(:old.IS_SYS), 'X'))
            OR (NVL(to_char(:new.RECORD_TIME), 'X') != NVL(to_char(:old.RECORD_TIME), 'X'))
            OR (NVL(to_char(:new.ANALYSIS_NO), 'X') != NVL(to_char(:old.ANALYSIS_NO), 'X'))
            OR (NVL(to_char(:new.WIFI_STATE), 'X') != NVL(to_char(:old.WIFI_STATE), 'X'))
            OR (NVL(to_char(:new.FACE_TASK_STATUS), 'X') != NVL(to_char(:old.FACE_TASK_STATUS), 'X'))
            OR (NVL(to_char(:new.VIDEO_TASK_STATUS), 'X') != NVL(to_char(:old.VIDEO_TASK_STATUS), 'X'))
            OR (NVL(to_char(:new.BAYONET_TASK_STATUS), 'X') != NVL(to_char(:old.BAYONET_TASK_STATUS), 'X'))
            OR (NVL(to_char(:new.VQD_URL), 'X') != NVL(to_char(:old.VQD_URL), 'X'))
            OR (NVL(to_char(:new.SYS_TYPE), 'X') != NVL(to_char(:old.SYS_TYPE), 'X'))
            OR (NVL(to_char(:new.IS_HAVE_CAPTURE), 'X') != NVL(to_char(:old.IS_HAVE_CAPTURE), 'X'))
    
        THEN
            INSERT INTO sync_data(ID, TABLE_NAME, COMMAND, TABLE_KEY, DATA_ID, FAIL_COUNT, CREATE_TIME)
            VALUES(sys_guid(), 'PT_CAMERA_INFO', 'UPDATE',  'ID', :new.ID || '', 0,  sysdate);
        END IF;
    END;
    
    CREATE OR REPLACE TRIGGER "T_PT_CAMERA_INFO_D"
    BEFORE DELETE ON "SHINY"."PT_CAMERA_INFO" FOR EACH ROW
    BEGIN
        INSERT INTO sync_data(ID, TABLE_NAME, COMMAND, TABLE_KEY, DATA_ID, FAIL_COUNT, CREATE_TIME)
        VALUES(sys_guid(), 'PT_CAMERA_INFO', 'DELETE',  'ID', :old.ID || '', 0,  sysdate);
    END;
    
    CREATE OR REPLACE TRIGGER "T_VIDEO_VIDEOSOURCE_I"
    AFTER INSERT ON "SHINY"."VIPF_VIDEO_VIDEOSOURCE" FOR EACH ROW
    BEGIN
        INSERT INTO sync_data(ID, TABLE_NAME, COMMAND, TABLE_KEY, DATA_ID, FAIL_COUNT, CREATE_TIME)
        VALUES(sys_guid(), 'VIPF_VIDEO_VIDEOSOURCE', 'INSERT',  'CAMERA_ID', :new.CAMERA_ID || '', 0,  sysdate);
    END;
    
    CREATE OR REPLACE TRIGGER "T_VIDEO_VIDEOSOURCE_U"
    AFTER UPDATE ON "SHINY"."VIPF_VIDEO_VIDEOSOURCE" FOR EACH ROW
    BEGIN
        INSERT INTO sync_data(ID, TABLE_NAME, COMMAND, TABLE_KEY, DATA_ID, FAIL_COUNT, CREATE_TIME)
        VALUES(sys_guid(), 'VIPF_VIDEO_VIDEOSOURCE', 'UPDATE',  'CAMERA_ID', :new.CAMERA_ID || '', 0,  sysdate);
    END;
    
    CREATE OR REPLACE TRIGGER "T_VIDEO_VIDEOSOURCE_D"
    BEFORE DELETE ON "SHINY"."VIPF_VIDEO_VIDEOSOURCE" FOR EACH ROW
    BEGIN
        INSERT INTO sync_data(ID, TABLE_NAME, COMMAND, TABLE_KEY, DATA_ID, FAIL_COUNT, CREATE_TIME)
        VALUES(sys_guid(), 'VIPF_VIDEO_VIDEOSOURCE', 'DELETE',  'CAMERA_ID', :old.CAMERA_ID || '', 0,  sysdate);
    END;
    View Code

    启动数据同步任务C#代码:

    using DataSync.DAL;
    using DataSync.Utils;
    using Utils;
    using System;
    using System.Collections.Generic;
    using System.ComponentModel;
    using System.Configuration;
    using System.Data;
    using System.Diagnostics;
    using System.Linq;
    using System.ServiceProcess;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Timers;
    using System.Windows.Forms;
    using Quartz.Impl;
    using Quartz;
    using DataSync.Jobs;
    using Models;
    
    namespace DataSync
    {
        /// <summary>
        /// 数据同步服务
        /// by suxiang
        /// </summary>
        partial class DataSyncService : ServiceBase
        {
            #region 变量
    
            #endregion
    
            #region 构造函数
            public DataSyncService()
            {
                InitializeComponent();
            }
            #endregion
    
            #region OnStart 启动服务
            protected override void OnStart(string[] args)
            {
                startScheduler();
    
                LogUtil.Log("服务启动成功");
            }
            #endregion
    
            #region OnStop 停止服务
            protected override void OnStop()
            {
                LogUtil.Log("服务停止成功");
    
                Thread.Sleep(100); //等待一会,待日志写入文件
            }
            #endregion
    
            #region Start 启动服务
            public void Start()
            {
                OnStart(null);
            }
            #endregion
    
            #region scheduleJob
            private async void startScheduler()
            {
                TaskSchedulerEx taskEx = new TaskSchedulerEx(8, 64);
                StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
                IScheduler scheduler = await schedulerFactory.GetScheduler();
                await scheduler.Start();
                await LogUtil.Log("任务调度器已启动");
    
                if (ConfigUtil.SyncParam == 1) //部署在政务网侧 
                {
                    scheduleJob(scheduler, "政务网中间库->政务网平台库", ConfigUtil.ZWW_Middle, ConfigUtil.ZWW_PT, taskEx);
                    scheduleJob(scheduler, "政务网平台库->政务网中间库", ConfigUtil.ZWW_PT, ConfigUtil.ZWW_Middle, taskEx);
                }
    
                if (ConfigUtil.SyncParam == 2) //部署在视频网侧
                {
                    scheduleJob(scheduler, "视频网平台库->视频网中间库", ConfigUtil.SPW_PT, ConfigUtil.SPW_Middle, taskEx);
                }
            }
    
            private async void scheduleJob(IScheduler scheduler, string taskName, string dbSourceConnectionString, string dbTargetConnectionString, TaskSchedulerEx taskEx)
            {
                string cronString = "0/5 * * * * ?";
                IJobDetail jobDetail = JobBuilder.Create<SyncJob>().Build();
                ITrigger trigger = TriggerBuilder.Create().WithCronSchedule(cronString).Build();
                SpeedInfo speedInfo = new SpeedInfo();
                jobDetail.JobDataMap.Put("taskName", taskName);
                jobDetail.JobDataMap.Put("dbSourceConnectionString", dbSourceConnectionString);
                jobDetail.JobDataMap.Put("dbTargetConnectionString", dbTargetConnectionString);
                jobDetail.JobDataMap.Put("speedInfo", speedInfo);
                jobDetail.JobDataMap.Put("taskEx", taskEx);
                await scheduler.ScheduleJob(jobDetail, trigger);
    
                await scheduleCalcSpeedJob(scheduler, taskName, speedInfo);
            }
    
            private async Task scheduleCalcSpeedJob(IScheduler scheduler, string taskName, SpeedInfo speedInfo)
            {
                int timeRange = 10;
                string cronString = "0/" + timeRange + " * * * * ?";
                IJobDetail jobDetail = JobBuilder.Create<CalcSpeedJob>().Build();
                ITrigger trigger = TriggerBuilder.Create().WithCronSchedule(cronString).Build();
                jobDetail.JobDataMap.Put("taskName", taskName);
                jobDetail.JobDataMap.Put("timeRange", timeRange);
                jobDetail.JobDataMap.Put("speedInfo", speedInfo);
                await scheduler.ScheduleJob(jobDetail, trigger);
            }
            #endregion
    
        }
    }
    View Code

    数据同步任务C#代码:

    using DataSync.DAL;
    using DataSync.Models;
    using DataSync.Utils;
    using Models;
    using Oracle.ManagedDataAccess.Client;
    using Quartz;
    using System;
    using System.Collections;
    using System.Collections.Generic;
    using System.Data;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using Utils;
    
    namespace DataSync.Jobs
    {
        /// <summary>
        /// 数据同步任务
        /// </summary>
        [DisallowConcurrentExecution]
        public class SyncJob : IJob
        {
            #region 变量属性
            private readonly string INSERT = "INSERT";
            private readonly string UPDATE = "UPDATE";
            private readonly string DELETE = "DELETE";
            #endregion
    
            #region 数据库操作类
            private OracleHelper _dbSource = null;
            private OracleHelper _dbTarget = null;
            #endregion
    
            #region 任务参数
            private string _taskName = null;
            private string _dbSourceConnectionString = null;
            private string _dbTargetConnectionString = null;
            private SpeedInfo _speedInfo = null;
            private TaskSchedulerEx _task = null;
            #endregion
    
            public Task Execute(IJobExecutionContext context)
            {
                return Task.Run(() =>
                {
                    try
                    {
                        _taskName = context.JobDetail.JobDataMap["taskName"].ToString();
                        _dbSourceConnectionString = context.JobDetail.JobDataMap["dbSourceConnectionString"].ToString();
                        _dbTargetConnectionString = context.JobDetail.JobDataMap["dbTargetConnectionString"].ToString();
                        _speedInfo = (SpeedInfo)context.JobDetail.JobDataMap["speedInfo"];
                        _task = (TaskSchedulerEx)context.JobDetail.JobDataMap["taskEx"];
    
                        _dbSource = new OracleHelper(_dbSourceConnectionString);
                        _dbTarget = new OracleHelper(_dbTargetConnectionString);
    
                        List<SYNC_DATA> list = ServiceHelper.Get<SyncDataDal>().GetList(_dbSourceConnectionString);
                        int oldCount = list.Count;
                        list = DictinctSyncData(list);
                        if (oldCount > 0 && oldCount != list.Count) LogUtil.Log(string.Format("SYNC_DATA集合去重前数量:{0} 去重后数量:{1}", oldCount, list.Count));
                        Dictionary<string, List<SYNC_DATA>> dict = list.ToLookup(a => a.TABLE_NAME).ToDictionary(a => a.Key, a => a.ToList()); //按表分组,对于单张表,必须用单线程,保证处理顺序
    
                        int syncCount = 0;
                        object lockObj = new object();
                        List<Task> taskList = new List<Task>();
                        foreach (string key in dict.Keys)
                        {
                            foreach (SYNC_DATA item in dict[key])
                            {
                                Task task = _task.Run((obj) =>
                                {
                                    SYNC_DATA syncData = obj as SYNC_DATA;
                                    string targetTableName = syncData.TABLE_NAME;
    
                                    //表名不一至的特殊情况
                                    if (ConfigUtil.SyncParam == 1) //部署在政务网侧 
                                    {
                                        if (syncData.TABLE_NAME == "VIPF_VIDEO_VIDEOSOURCE") targetTableName = "ZZ_VIDEO_VIDEOSOURCE1";
                                    }
    
                                    //查询并缓存目标数据库表字段信息集合
                                    List<DbColumnInfo> columnList = MemoryCacheUtil.TryGetValue<List<DbColumnInfo>>(_dbSourceConnectionString + targetTableName, () => _dbTarget.GetAllColumns(targetTableName));
    
                                    int row;
                                    SyncOneData(_dbSourceConnectionString, (SYNC_DATA)syncData, targetTableName, _dbSource, _dbTarget, columnList, out row);
                                    lock (lockObj) { syncCount += row; }
    
                                    _speedInfo.addCount();
                                }, item);
                                taskList.Add(task);
                            }
                        }
                        Task.WaitAll(taskList.ToArray());
                        if (syncCount > 0) LogUtil.Log(string.Format("本次成功同步 {0} 条数据", syncCount));
                    }
                    catch (Exception ex)
                    {
                        LogUtil.Error(ex, "数据同步任务 出错");
                    }
                });
            }
    
            #region 同步一条数据
            /// <summary>
            /// 同步一条数据
            /// </summary>
            private void SyncOneData(string connStr, SYNC_DATA item, string targetTableName, OracleHelper dbSource, OracleHelper dbTarget, List<DbColumnInfo> columnList, out int row)
            {
                bool isFault = false;
                DateTime dt = DateTime.Now;
                row = 0;
    
                try
                {
                    #region 通过表名查询字段集合
                    string columns = string.Join(",", columnList.ConvertAll<string>(a => a.columns_name));
                    string columnsForInsert = string.Join(",", columnList.ConvertAll<string>(a => ":" + a.columns_name));
                    string columnsForUpdate = string.Join(",", columnList.ConvertAll<string>(a => a.columns_name + "=:" + a.columns_name));
                    #endregion
    
                    if (item.COMMAND == INSERT || item.COMMAND == UPDATE)
                    {
                        //数据查询
                        DataTable sourceData = dbSource.Query(string.Format(@"select * from {0} where {1}", item.TABLE_NAME, CreateWhere(item.TABLE_KEY, item.DATA_ID)));
                        DataTable targetData = dbTarget.Query(string.Format(@"select * from {0} where {1}", targetTableName, CreateWhere(item.TABLE_KEY, item.DATA_ID)));
    
                        //判断摄像机表是否需要同步
                        if (item.TABLE_NAME == "PT_CAMERA_INFO")
                        {
                            if (!needSync(sourceData, targetData))
                            {
                                ServiceHelper.Get<SyncDataDal>().DeleteById(connStr, item.ID);
                                return;
                            }
                        }
    
                        if (sourceData != null && sourceData.Rows.Count > 0)
                        {
                            bool exists = targetData != null && targetData.Rows.Count > 0;
    
                            if ((item.COMMAND == INSERT && !exists) || (item.COMMAND == UPDATE && !exists))
                            {
                                #region 插入数据
                                string insertSql = string.Format("insert into {0}({1}) values({2})", targetTableName, columns, columnsForInsert);
    
                                #region 字段赋值
                                OracleParameter[] paramArr = new OracleParameter[columnList.Count];
    
                                for (int i = 0; i < columnList.Count; i++)
                                {
                                    DbColumnInfo columnInfo = columnList[i];
    
                                    paramArr[i] = new OracleParameter(columnInfo.columns_name, GetColumnData(sourceData, columnInfo.columns_name));
                                }
                                #endregion
    
                                row = dbTarget.ExecuteSql(insertSql, paramArr);
    
                                if (row > 0) LogUtil.Log(string.Format("{0} 表 {1} 插入一条数据,{2}", _taskName, targetTableName, CreateWhereLog(item.TABLE_KEY, item.DATA_ID)));
                                #endregion
                            }
    
                            if ((item.COMMAND == INSERT && exists) || (item.COMMAND == UPDATE && exists))
                            {
                                #region 更新数据
                                string updateSql = string.Format("update {0} set {1} where {2}", targetTableName, columnsForUpdate, CreateWhere(item.TABLE_KEY, item.DATA_ID));
    
                                #region 字段赋值
                                List<OracleParameter> paramList = new List<OracleParameter>();
    
                                foreach (DbColumnInfo columnInfo in columnList)
                                {
                                    paramList.Add(new OracleParameter(columnInfo.columns_name, GetColumnData(sourceData, columnInfo.columns_name)));
                                }
                                #endregion
    
                                row = dbTarget.ExecuteSql(updateSql, paramList.ToArray());
    
                                if (row > 0) LogUtil.Log(string.Format("{0} 表 {1} 更新一条数据,{2}", _taskName, targetTableName, CreateWhereLog(item.TABLE_KEY, item.DATA_ID)));
                                #endregion
                            }
                        }
                    }
    
                    if (item.COMMAND == DELETE)
                    {
                        #region 删除数据
                        string deleteSql = string.Format("delete from {0} where {1}", targetTableName, CreateWhere(item.TABLE_KEY, item.DATA_ID));
    
                        row = dbTarget.ExecuteSql(deleteSql);
    
                        if (row > 0) LogUtil.Log(string.Format("{0} 表 {1} 删除一条数据,{2}", _taskName, targetTableName, CreateWhereLog(item.TABLE_KEY, item.DATA_ID)));
                        #endregion
                    }
    
                    ServiceHelper.Get<SyncDataDal>().DeleteById(connStr, item.ID);
    
                }
                catch (Exception ex)
                {
                    isFault = true;
                    LogUtil.Error(ex, "同步数据失败,表名:" + item.TABLE_NAME.ToUpper() + " 命令:" + item.COMMAND + " 数据ID:" + item.DATA_ID);
                }
    
                try
                {
                    if (isFault)
                    {
                        SYNC_DATA sysSyncBs = ServiceHelper.Get<SyncDataDal>().GetById(connStr, item.ID);
                        sysSyncBs.FAIL_COUNT++;
                        ServiceHelper.Get<SyncDataDal>().Update(connStr, sysSyncBs);
                        LogUtil.Debug("更新业务数据同步表SYNC_DATA,ID:" + item.ID);
                    }
                }
                catch (Exception ex)
                {
                    LogUtil.Error(ex, "更新业务数据同步表SYNC_DATA出错");
                }
            }
            #endregion
    
            #region 判断是否是主键
            /// <summary>
            /// 判断是否是主键
            /// </summary>
            private bool IsIdField(string columnName, string idField)
            {
                if (idField.IndexOf(',') >= 0)
                {
                    string[] idFieldArr = idField.Split(',');
                    return idFieldArr.ToList().Exists(a => a == columnName);
                }
                else
                {
                    return columnName == idField;
                }
            }
            #endregion
    
            #region 根据主键生成Where条件语句
            /// <summary>
            /// 根据主键生成Where条件语句
            /// </summary>
            private string CreateWhere(string idField, string idData)
            {
                if (idField.IndexOf(',') >= 0)
                {
                    StringBuilder sb = new StringBuilder(" 1=1 ");
                    string[] idFieldArr = idField.Split(',');
                    string[] idDataArr = idData.Split(',');
                    for (int i = 0; i < idFieldArr.Length; i++)
                    {
                        sb.AppendFormat(" and {0}='{1}' ", idFieldArr[i], idDataArr[i]);
                    }
                    return sb.ToString();
                }
                else
                {
                    return string.Format("{0}='{1}'", idField, idData);
                }
            }
            #endregion
    
            #region 根据字段名从DataTable中获取数据
            /// <summary>
            /// 根据字段名从DataTable中获取数据
            /// </summary>
            private object GetColumnData(DataTable data, string columnName)
            {
                for (int i = 0; i < data.Columns.Count; i++)
                {
                    if (data.Columns[i].ColumnName == columnName)
                    {
                        return data.Rows[0][data.Columns[i]];
                    }
                }
                return DBNull.Value;
            }
            #endregion
    
            #region 根据主键生成Where条件语句日志信息
            /// <summary>
            /// 根据主键生成Where条件语句日志信息
            /// </summary>
            private string CreateWhereLog(string idField, string idData)
            {
                if (idField.IndexOf(',') >= 0)
                {
                    List<string> list = new List<string>();
                    string[] idFieldArr = idField.Split(',');
                    string[] idDataArr = idData.Split(',');
                    for (int i = 0; i < idFieldArr.Length; i++)
                    {
                        list.Add(string.Format("{0}:{1}", idFieldArr[i], idDataArr[i]));
                    }
                    return string.Join("", list.ToArray());
                }
                else
                {
                    return string.Format("{0}:{1}", idField, idData);
                }
            }
            #endregion
    
            #region 判断是否需要同步
            private bool needSync(DataTable sourceTable, DataTable targetTable)
            {
                HashSet<string> hashSet = new HashSet<string>() { "POLLING_TIME", "CAMERA_STATE" };
    
                if (targetTable == null || targetTable.Rows.Count == 0) return true;
    
                if (sourceTable != null && sourceTable.Rows.Count > 0 && targetTable != null && targetTable.Rows.Count > 0)
                {
                    foreach (DataColumn column in sourceTable.Columns)
                    {
                        if (targetTable.Columns.Contains(column.ColumnName))
                        {
                            if (sourceTable.Rows[0][column.ColumnName] != null && !sourceTable.Rows[0][column].Equals(targetTable.Rows[0][column.ColumnName]))
                            {
                                if (!hashSet.Contains(column.ColumnName))
                                {
                                    return true;
                                }
                            }
                        }
                    }
                }
                return false;
            }
            #endregion
    
            #region SYNC_DATA集合去重,对于按条件相同的数据,按顺序保留最后一条
            /// <summary>
            /// SYNC_DATA集合去重,对于按条件相同的数据,按顺序保留最后一条
            /// </summary>
            private List<SYNC_DATA> DictinctSyncData(List<SYNC_DATA> list)
            {
                List<SYNC_DATA> result = new List<SYNC_DATA>();
    
                for (int i = 0; i < list.Count; i++)
                {
                    SYNC_DATA dataI = list[i];
    
                    bool exists = false;
                    for (int j = i + 1; j < list.Count; j++)
                    {
                        SYNC_DATA dataJ = list[j];
    
                        if (dataI.TABLE_NAME == dataJ.TABLE_NAME
                            && dataI.TABLE_KEY == dataJ.TABLE_KEY
                            && dataI.DATA_ID == dataJ.DATA_ID)
                        {
                            exists = true;
                        }
                    }
    
                    if (!exists)
                    {
                        result.Add(dataI);
                    }
                }
    
                return result;
            }
            #endregion
    
        }
    
    }
    View Code
  • 相关阅读:
    团队冲刺二(6)
    团队冲刺二(5)
    JAVA面试中问及HIBERNATE与 MYBATIS的对比,在这里做一下总结
    解决ADB server didn't ACK问题,连上手机问题
    ADB server didn't ACK
    android错误
    Andy
    在Eclipse下搭建Android开发环境教程
    VM VirtualBox 安装 Android 4.3虚拟机完整教程
    电脑安装Android4.0虚拟机的做法
  • 原文地址:https://www.cnblogs.com/s0611163/p/15098620.html
Copyright © 2011-2022 走看看