Jan 5, 2008
Introduction: Data Synchronization in SQL Express
Data synchronization feature is available only in the SQL Server. In order to mimic that feature in SQL Express, currently we are using TableDiff utility method. This document proposes a new Stored Procedure method to speedup the entire data synchronization process as well as overcome some issues found in TableDiff utility method.
Data Synchronization using TableDiff utility method:
We can use TableDiff utility to generate a Transact-SQL script (containing delete/insert/update statements) to fix discrepancies at the destination server to bring the source and destination tables into convergence. Since this utility compare one table at a time, we need to call it in a loop in case we have N number of tables to synchronize. Within the loop, we have to accumulate (i.e., append) the generated Transact-SQL script into a local file (say CompleteFixSQL.sql). At end of the loop, we have a complete script file that need to be executed at destination server. sqlcmd utility can be used to run that script file (CompleteFixSQL.sql) into the destination server to bring the source and destination tables into convergence.
However, there are some drawbacks using these utilities
•Both TableDiff & sqlcmd utilities are external application need to be called from client application code. TableDiff need to be called N times if we have N number of tables to be synchronized which incurs I/O overhead.
•Sqlcmd utility executes statements that are contained in the CompleteFixSQL script one by one in a sequence manner which is a time consuming process in case we have large data to be synchronized at destination.
•TableDiff utility has some limitations. It would not generate FIX script file for LOB datatypes such as text, ntext & image.
Data Synchronization using Stored Procedure:
A Stored Procedure (SP) can be used to compare the difference between source and destination tables and then synchronize the destination tables with source table data. To bring the source and destination tables into convergence,
1.Find the records that need to be deleted from Destination database table
2.Find the records that need to be inserted into Destination database table
3.Find the records that need to be updated in the Destination database table
Subsequently, we have to execute delete, insert and update statements in the destination database for the records that are found in the above steps 1, 2 & 3 respectively.
The advantages of this SP method over TableDiff method are
•The stored procedure is already compiled and stored within the destination database. Client application code is just need to call this SP using the connection string
•Both table Compare & Synchronization will be done at a single query(one per each delete, insert & update)
•Records are processed (deleted, inserted & updated) in bulk manner.
•LOB datatypes are supported
Step 1: Records to be deleted from Destination database table
•Select the records that does not exist in Source database table, but exist in the Destination database table
•Then delete them from the Destination database table
delete from
DestinationDB.dbo.TableName DestinationDBTable
where
not exists
(select
1
from
SourceDB.dbo.TableName SourceDBTable
where
SourceDBTable.PrimaryColumnName1 = DestinationDBTable.PrimaryColumnName1 and
SourceDBTable.PrimaryColumnName2 = DestinationDBTable.PrimaryColumnName2 and
...
SourceDBTable.PrimaryColumnNameN = DestinationDBTable.PrimaryColumnNameN
)
If the table contains an Identity column then we can simply use that column rather than primary key column in the join condition of the WHERE clause. This will reduce the size of the join condition especially when the table having composite primary keys and an Identity column. Also it will improve the performance of the delete statement
delete from
DestinationDB.dbo.TableName DestinationDBTable
where
not exists
(select
1
from
SourceDB.dbo.TableName SourceDBTable
where
SourceDBTable.IdentityColumn = DestinationDBTable.IdentityColumn
)
Step 2: Records to be inserted into Destination database table
•Select the records that are exist in the Source database table, but does not exist in the Destination database table
•Then insert them into the Destination database table
insert into
DestinationDB.dbo.TableName DestinationDBTable
(ColumnList)
select
SourceDBTable.Columnlist
from
SourceDB.dbo.TableName SourceDBTable
where
not exists
(select
1
from
DestinationDB.dbo.TableName DestinationDBTable
where
DestinationDBTable.PrimaryColumnName1 = SourceDBTable.PrimaryColumnName1 and
DestinationDBTable.PrimaryColumnName2 = SourceDBTable.PrimaryColumnName2 and
...
DestinationDBTable.PrimaryColumnNameN = SourceDBTable.PrimaryColumnNameN
)
Column with TimeStamp datatype should be excluded from the Column list of the above insert statement as we cannot explicitly set values for TimeStamp column.
As specified in the Step 1 we can use identity column rather than primary key column as follows
insert into
DestinationDB.dbo.TableName DestinationDBTable
(ColumnList)
select
SourceDBTable.Columnlist
from
SourceDB.dbo.TableName SourceDBTable
where
not exists
(select
1
from
DestinationDB.dbo.TableName DestinationDBTable
where
DestinationDBTable.IdentityColumn = SourceDBTable. IdentityColumn
)
If the table having identity column then the above insert statement must be enclosed by the “set identity_insert on/off� as follows
set identity_insert TableName On
... above insert statement
set identity_insert TableName off
Step 3.Records to be updated in the Destination database table
•Select the records that are differ from Source & Destination database table
•Then update them in the Destination database table with the source database table data
update
TableName
set
ColumnName1 = SourceDBTable.ColumnName1,
ColumnName2 = SourceDBTable.ColumnName2,
...
ColumnNameN = SourceDBTable.ColumnNameN
from
DestinationDB.dbo.TableName DestinationDBTable,
(
select
max(TableName) as TableName, columnlist
from
(
select
'SourceTableName' as TableName, columnlist
from
SourceTableName
union all
select
'DestinationiTableName' as TableName, columnlist
from
DestinationTableName
) AliasName
group by
columnlist
having
count(*) = 1
and max(TableName) = 'SourceTableName'
) SourceDBTable
where
SourceDBTable.PrimaryColumnName1 = DestinationDBTable.PrimaryColumnName1 and
SourceDBTable.PrimaryColumnName2 = DestinationDBTable.PrimaryColumnName2 and
...
SourceDBTable.PrimaryColumnNameN = DestinationDBTable.PrimaryColumnNameN
Column with TimeStamp datatype should be excluded from the SET clause of the above update statement as we cannot explicitly set values for TimeStamp column.
Column with LOB datatypes (Text,nText & Image) should be converted to respective Large Value datatypes[Varchar(max),nVarchar(max) & varbinary(max)] from the queries that are combined by UNION ALL of the above statement. This is because the UNION ALL causes the sorting mechanism which prohibits LOB datatypes.
As specified in the Step 1 & 2 we can use identity column rather than primary key column in the WHERE clause of the update statement.
Stored procedure for Data Synchronization:
The below Data Synchronization - Beta Version - Script contains following view and stored procedures (SP) to implement the Data Synchronization
1.v_DTS_ColumnInformation
2.stp_DTS_GetCommaSeperatedColumnString
3.stp_DTS_GetIdentityOrPrimaryKeyColumnDetails
4.stp_DTS_SetDestinationColumnWithSourceColumnString
5.stp_DTS_DataSynchronization
v_DTS_ColumnInformation: This view will be used to populate the column details such as Data Type, Primary Key, Null constraint, Identity property, Column size constraint (Length for character datatype, Precision and scale of number datatype)
stp_DTS_GetCommaSeperatedColumnString: This SP generates various strings comma separated column string for a given table
stp_DTS_GetIdentityOrPrimaryKeyColumnDetails: This SP generates various strings for an Identity or Primary key columns of a given table.
stp_DTS_SetDestinationColumnWithSourceColumnString: This SP generates the SET clause for the update statement described in the step 3.
stp_DTS_DataSynchronization: This is the main SP will be used to synchronize the destination tables with the source tables data.
Details of the parameters used in all of the above procedures are described along with the header of each SP.
Assumption:
•Both source and destination tables’ schema are identical.
•Both source and destination data sources are different.
•Destination server has a linked server with source server in case both are remotely connected.
•All the SP’s and view listed above are stored & compiled in the destination database.
•All Foreign key constraints & Trigger (that affects another tables) of the destination tables are disabled before executing stp_DTS_DataSynchronization
•The main SP stp_DTS_DataSynchronization will be executed at destination database.
•User calls the main SP stp_DTS_DataSynchronization with valid parameters.
Limitation:
•Column with Timestamp datatype excluded from the data synchronization
.
Features of future version:
•The Stored Procedure will be extended to include parameter validation.
•It will be extended in such a way to disable the Foreign key constraints & Triggers of the destination tables before starting synchronization. After completion of data synchronization (either success or failure), all Foreign key constraints & Triggers will be enabled back to maintain data integrity.
•It will be extended to provide Log of the data synchronization process. So that user can know, how many records are deleted/inserted/updated and what are they. Hint: The OUTPUT clause can be used to achieve this.
Conclusion:
Data synchronization using Stored Procedure method is faster than the TableDiff method. The genuine feedback from readers will be helpful to achieve a better solution than this proposed method. Together we will meet that goal.
References:
1.The shortest, fastest, and easiest way to compare two tables in SQL Server: UNION (http://weblogs.sqlteam.com/jeffs/archive/2004/11/10/2737.aspx)
2.TableDiff Utility (http://technet.microsoft.com/en-us/library/ms162843.aspx)
Acknowledgment:
My sincere thanks to all the experts who participated and spend their valuable time to discuss the technique of Table Comparison in the Jeff's SQL Server Blog (http://weblogs.sqlteam.com/jeffs/archive/2004/11/10/2737.aspx).
•Jeff: For his UNION ALL method to compare tables
•Click: For his NOT EXISTS method to compare tables
•David L. Penton: For his explanation of the issues found in NOT EXISTS method
•John: For his powerful coding to generate comma separated list with a single SELECT
Data Synchronization - Beta Version.sql - Script
/**********************************************************************
' FILE NAME : v_DTS_ColumnInformation.sql
' VERSION : Beta version
' CREATED DATE : 05-Jan-2008
' WRITTEN BY : Ganesan Krishnan
' DESCRIPTION : This view populates column details
' This view used by most of the stp_DTS.xxx stored procedures
' COLUMN : 1. Table Name
' 2. Column Name
' 3. Data Type
' 4. A flag indicates whether the column is part of the Primary Key or not
' 5. A flag indicates whether the column is nullable or not
' 6. A flag indicates whether the table has identity column or not
' 7. Length of character data type column
' 8. Precision of numeric data type column
' 9. Sclae of numeric data type column
'*********************************************************************
' Modification History
'*********************************************************************/
/*
-- Retrieve column information
select * from vColumnInformation
*/
if exists (select * from dbo.sysobjects where id = object_id(N'[dbo].[v_DTS_ColumnInformation]') )
drop view [dbo].[v_DTS_ColumnInformation]
GO
create view v_DTS_ColumnInformation
as
select
TableName = isc.table_name
,ColumnName = isc.column_name
,DataType = isc.data_type
,IsPrimaryKey = case when iskcu.ordinal_position is null then 0 else 1 end
,IsNullable = sc.isnullable
,IsIdentity = case when sc.status =128 then 1 else 0 end
,CharacterMaximumLength = isc.Character_Maximum_Length
,NumericPrecision = isc.Numeric_Precision
,NumericScale = isc.Numeric_Scale
from
information_schema.columns isc
left outer join information_schema.key_column_usage iskcu
on isc.table_name = iskcu.table_name and isc.column_name = iskcu.column_name
inner joinsysobjects so
on isc.table_name = so.name
inner join syscolumns sc
on so.id = sc.id and isc.column_name = sc.name
go
/**********************************************************************
' FILE NAME : stp_DTS_GetCommaSeperatedColumnString.sql
' VERSION : Beta version
' CREATED DATE : 05-Jan-2008
' WRITTEN BY : Ganesan Krishnan
' DESCRIPTION : This stored procedure generates the comma seperated
' column string for a given table
' This SP called from stp_DTS_DataSynchronization
' PARAMTERS : 1. Table Name (Input)
' 2. Column String (Output)
' 3. Column String without TimeStamp datatype column(Output)
' 4. Column String With casting LOB to Large Value Datatype (Output)
' 5. Column String With casting Large Value to LOB Datatype(Output)
'*********************************************************************
' Modification History
'*********************************************************************/
/*
-- Execute stp_DTS_GetCommaSeperatedColumnString procedure
declare @v_ColumnString varchar(max), @v_ColumnStringWithoutTimeStampDataType varchar(max) , @v_ColumnStringWithCastingLOBToLargeValueDataType varchar(max), @v_ColumnStringWithCastingLargeValueToLOBDataType varchar(max)
exec dbo.stp_DTS_GetCommaSeperatedColumnString 'TableName',@v_ColumnString out, @v_ColumnStringWithoutTimeStampDataType out, @v_ColumnStringWithCastingLOBToLargeValueDataType out, @v_ColumnStringWithCastingLargeValueToLOBDataType out
select
ColumnString = @v_ColumnString
,ColumnStringWithoutTimeStampDataType = @v_ColumnStringWithoutTimeStampDataType
,ColumnStringWithCastingLOBToLargeValueDataType = @v_ColumnStringWithCastingLOBToLargeValueDataType
,ColumnStringWithCastingLargeValueToLOBDataType = @v_ColumnStringWithCastingLargeValueToLOBDataType
*/
if exists (select * from dbo.sysobjects where id = object_id(N'[dbo].stp_DTS_GetCommaSeperatedColumnString') and OBJECTPROPERTY(id, N'IsProcedure') = 1)
drop procedure [dbo].[stp_DTS_GetCommaSeperatedColumnString]
GO
create procedure stp_DTS_GetCommaSeperatedColumnString
(@p_TableName varchar(254)
,@p_ColumnString varchar(max) out
,@p_ColumnStringWithoutTimeStampDataType varchar(max) out
,@p_ColumnStringWithCastingLOBToLargeValueDataType varchar(max) out
,@p_ColumnStringWithCastingLargeValueToLOBDataType varchar(max) out
)
as
begin
set nocount on
select
@p_ColumnString = ''
,@p_ColumnStringWithCastingLOBToLargeValueDataType = ''
,@p_ColumnStringWithCastingLargeValueToLOBDataType = ''
,@p_ColumnStringWithoutTimeStampDataType = ''
select
@p_ColumnString = @p_ColumnString +
case
when len(@p_ColumnString)>0 then ', '
else ''
end +
'[' + ColumnName + ']'
,@p_ColumnStringWithoutTimeStampDataType = @p_ColumnStringWithoutTimeStampDatatype +
case
when len(@p_ColumnStringWithoutTimeStampDatatype)>0 and DataType != 'timestamp' then ', '
else ''
end +
case
when DataType = 'timestamp' then ''
else '[' + ColumnName + ']'
end
,@p_ColumnStringWithCastingLOBToLargeValueDataType = @p_ColumnStringWithCastingLOBToLargeValueDataType +
case
when len(@p_ColumnStringWithCastingLOBToLargeValueDataType)>0 then ', '
else ''
end +
case
when DataType = 'image' then 'convert(varbinary(max),[' + ColumnName + ']) as ' + '[' + ColumnName + ']'
when DataType = 'text' then 'convert(varchar(max),[' + ColumnName + ']) as ' + '[' + ColumnName + ']'
when DataType in ('ntext','xml') then 'convert(nvarchar(max),[' + ColumnName + ']) as ' + '[' + ColumnName + ']'
else '[' + ColumnName + ']'
end
,@p_ColumnStringWithCastingLargeValueToLOBDataType = @p_ColumnStringWithCastingLargeValueToLOBDataType +
case
when len(@p_ColumnStringWithCastingLargeValueToLOBDataType)>0 then ', '
else ''
end +
case
when DataType = 'image' then 'convert(image,[' + ColumnName + ']) as ' + '[' + ColumnName + ']'
when DataType = 'text' then 'convert(text,[' + ColumnName + ']) as ' + '[' + ColumnName + ']'
when DataType = 'ntext' then 'convert(ntext,[' + ColumnName + ']) as ' + '[' + ColumnName + ']'
when DataType = 'xml' then 'convert(xml,[' + ColumnName + ']) as ' + '[' + ColumnName + ']'
else '[' + ColumnName + ']'
end
from
v_DTS_ColumnInformation
where
TableName = @p_TableName
end
go
/**********************************************************************
' FILE NAME : stp_DTS_GetIdentityOrPrimaryKeyColumnDetails.sql
' VERSION : Beta version
' CREATED DATE : 05-Jan-2008
' WRITTEN BY : Ganesan Krishnan
' DESCRIPTION : This stored procedure generates the various string
' for an Identity or Primary Key column for a given table
' This SP called from stp_DTS_DataSynchronization
' PARAMTERS : 1. Table Name (Input)
' 2. Equi Join String (Output)
' 3. Column Structure String for the temporary table (Output)
' 4. Comma seperated column strinig for DELETED table (Output)
' 5. Comma seperated column strinig for INSERTED table (Output)
' 6. Comma seperated column strinig for temporary table that hold detailed log (Output)
' 7. Comma seperated column string of Identity or Priamry key column (Output)
' 8. A flag indicates whether the table contains identity column or not (Output)
'*********************************************************************
' Modification History
'*********************************************************************/
/*
-- Executes stp_DTS_GetIdentityOrPrimaryKeyColumnDetails procedure
declare
@p_EquiJoinString varchar(max),@p_StructureString varchar(max),@p_DeletedString varchar(max),@p_InsertedString varchar(max),@p_TmpTableColumnListString varchar(max),@p_IdentityOrPrimaryKeyColumnString varchar(max),@p_IsIdentity bit
exec stp_DTS_GetIdentityOrPrimaryKeyColumnDetails 'TableName',@p_EquiJoinString OUT, @p_StructureString OUT, @p_DeletedString OUT, @p_InsertedString OUT, @p_TmpTableColumnListString OUT,@p_IdentityOrPrimaryKeyColumnString out, @p_IsIdentity OUT
select
EquiJoinString = @p_EquiJoinString
,StructureString = @p_StructureString
,DeletedString = @p_DeletedString
,InsertedString = @p_InsertedString
,TmpTableColumnListString = @p_TmpTableColumnListString
,IdentityOrPrimaryKeyColumnString = @p_IdentityOrPrimaryKeyColumnString
,IsIdentity = @p_IsIdentity
*/
if exists (select * from dbo.sysobjects where id = object_id(N'[dbo].[stp_DTS_GetIdentityOrPrimaryKeyColumnDetails]') and OBJECTPROPERTY(id, N'IsProcedure') = 1)
drop procedure [dbo].[stp_DTS_GetIdentityOrPrimaryKeyColumnDetails]
GO
create procedure [dbo].[stp_DTS_GetIdentityOrPrimaryKeyColumnDetails]
(@p_TableName varchar(254)
,@p_EquiJoinString varchar(max) out
,@p_StructureString varchar(max) out
,@p_DeletedString varchar(max) out
,@p_InsertedString varchar(max) out
,@p_TmpTableColumnListString varchar(max) out
,@p_IdentityOrPrimaryKeyColumnString varchar(max) out
,@p_IsIdentity bit out
)
as
begin
set nocount on
select
@p_EquiJoinString = ''
,@p_StructureString = ''
,@p_DeletedString = ''
,@p_InsertedString = ''
,@p_TmpTableColumnListString = ''
,@p_IdentityOrPrimaryKeyColumnString = ''
,@p_IsIdentity=1
-- Construct the EquiJoinString using identity column if the table contains Identity column
select
@p_EquiJoinString = @p_EquiJoinString +
case
when len(@p_EquiJoinString)>0 then ' and '
else ''
end +
@p_TableName + '.[' + ColumnName + '] = SrcDBTable.[' + ColumnName + ']'
,@p_StructureString = @p_StructureString +
case
when len(@p_StructureString)>0 then ', '
else ''
end +
'[' + ColumnName + ']' + ' ' +
CASE DataType
WHEN 'char' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')'
WHEN 'nchar' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')'
WHEN 'varchar' THEN Datatype + '('+convert(varchar, CharacterMaximumLength) + ')'
WHEN 'nvarchar' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')'
WHEN 'decimal' THEN Datatype +'('+ convert(varchar, NumericPrecision) + ',' +
convert(varchar, NumericScale) + ')'
WHEN 'numeric' THEN Datatype +'('+ convert(varchar, NumericPrecision) + ',' +
convert(varchar, NumericScale) + ')'
WHEN 'float' THEN Datatype +'('+ convert(varchar, NumericPrecision) + ')'
WHEN 'binary' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')'
WHEN 'varbinary' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')'
else
DataType
END
,@p_DeletedString = @p_DeletedString +
case
when len(@p_DeletedString)>0 then ', '
else ''
end +
'deleted.[' + ColumnName + ']'
,@p_InsertedString = @p_InsertedString +
case
when len(@p_InsertedString)>0 then ', '
else ''
end +
'inserted.[' + ColumnName + ']'
,@p_TmpTableColumnListString = @p_TmpTableColumnListString +
case
when len(@p_TmpTableColumnListString)>0 then '+'',''+'
else ''
end +
'convert(varchar(max),[' + ColumnName + '])'
,@p_IdentityOrPrimaryKeyColumnString = @p_IdentityOrPrimaryKeyColumnString +
case
when len(@p_IdentityOrPrimaryKeyColumnString)>0 then ', '
else ''
end +
'[' + ColumnName + ']'
from
v_DTS_ColumnInformation
where
TableName = @p_TableName
and IsIdentity = 1 -- Include only Identity column
-- Construct the EquiJoinString using primary column if the table does not contains Identity column
if @p_EquiJoinString = ''
begin
set @p_IsIdentity = 0
select
@p_EquiJoinString = @p_EquiJoinString +
case
when len(@p_EquiJoinString)>0 then ' and '
else ''
end +
@p_TableName + '.[' + ColumnName + '] = SrcDBTable.[' + ColumnName + ']'
,@p_StructureString = @p_StructureString +
case
when len(@p_StructureString)>0 then ', '
else ''
end +
'[' + ColumnName + ']' + ' ' +
CASE DataType
WHEN 'char' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')'
WHEN 'nchar' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')'
WHEN 'varchar' THEN Datatype + '('+convert(varchar, CharacterMaximumLength) + ')'
WHEN 'nvarchar' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')'
WHEN 'decimal' THEN Datatype +'('+ convert(varchar, NumericPrecision) + ',' +
convert(varchar, NumericScale) + ')'
WHEN 'numeric' THEN Datatype +'('+ convert(varchar, NumericPrecision) + ',' +
convert(varchar, NumericScale) + ')'
WHEN 'float' THEN Datatype +'('+ convert(varchar, NumericPrecision) + ')'
WHEN 'binary' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')'
WHEN 'varbinary' THEN Datatype +'('+ convert(varchar, CharacterMaximumLength) + ')'
else
DataType
END
,@p_DeletedString = @p_DeletedString +
case
when len(@p_DeletedString)>0 then ', '
else ''
end +
'deleted.[' + ColumnName + ']'
,@p_InsertedString = @p_InsertedString +
case
when len(@p_InsertedString)>0 then ', '
else ''
end +
'inserted.[' + ColumnName + ']'
,@p_TmpTableColumnListString = @p_TmpTableColumnListString +
case
when len(@p_TmpTableColumnListString)>0 then '+'',''+'
else ''
end +
'convert(varchar(max),[' + ColumnName + '])'
,@p_IdentityOrPrimaryKeyColumnString = @p_IdentityOrPrimaryKeyColumnString +
case
when len(@p_IdentityOrPrimaryKeyColumnString)>0 then ', '
else ''
end +
'[' + ColumnName + ']'
from
v_DTS_ColumnInformation
where
TableName = @p_TableName
and IsPrimaryKey = 1 -- Include only primary key column
end
end
go
/**********************************************************************
' FILE NAME : stp_DTS_SetDestinationColumnWithSourceColumnString.sql
' VERSION : Beta version
' CREATED DATE : 05-Jan-2008
' WRITTEN BY : Ganesan Krishnan
' DESCRIPTION : This stored procedure generates the set string
' of an update statement for a given table
' This excludes TimeStamp datatype column
' This SP called from stp_DTS_DataSynchronization
' PARAMTERS : 1. Table Name (Input)
' 2. Set String (Output)
'*********************************************************************
' Modification History
'*********************************************************************/
/*
-- Execute stp_DTS_SetDestinationColumnWithSourceColumnString procedure
declare @v_SetString varchar(max)
exec stp_DTS_SetDestinationColumnWithSourceColumnString 'TableName', @v_SetString out
select @v_SetString
*/
if exists (select * from dbo.sysobjects where id = object_id(N'[dbo].[stp_DTS_SetDestinationColumnWithSourceColumnString]') and OBJECTPROPERTY(id, N'IsProcedure') = 1)
drop procedure [dbo].[stp_DTS_SetDestinationColumnWithSourceColumnString]
GO
create procedure stp_DTS_SetDestinationColumnWithSourceColumnString
(@p_TableName varchar(254)
,@p_SetString varchar(max) out
)
as
begin
set nocount on
set @p_SetString = ''
select
@p_SetString = @p_SetString +
case
when len(@p_SetString)>0 then ', '
else ''
end +
'[' + ColumnName + '] = SrcDBTable.[' + ColumnName + ']'
from
v_DTS_ColumnInformation
where
TableName = @p_TableName
and IsPrimaryKey = 0 -- Include only non key column
and IsIdentity = 0 -- Include only non identity column
and DataType != 'timestamp' -- Exclude TimeStamp data type column
end
go
/**********************************************************************
' FILE NAME : stp_DTS_DataSynchronization.sql
' VERSION : Beta version
' CREATED DATE : 05-Jan-2008
' WRITTEN BY : Ganesan Krishnan
' DESCRIPTION : This stored procedure synchronize the destination tables
' data with the Source tables data
' This SP should be executed at Destination database
' PARAMTERS : 1. Linked Server Name of the Source Database (Input)
' 2. Source Database Name (Input)
' 3. Comma seperated List of Tables to be synchronized (Input)
' 4. Program Mode (Input)
' -1 -> Table Compare only
'0 -> Table Compare & Data Synchronization
'1 -> Data Synchronization Only
'*********************************************************************
' Modification History
'*********************************************************************/
/*
-- Execute Data Synchronization procedure at destination database to sync Destination table with Source table data
-- Table Compare Only
-- exec [DestinationServerInstance].DestinationDB.dbo.stp_DTS_DataSynchronization 'SourceServerInstance','SourceDBName','TableName1,TableName2,TableName3',-1
-- Table Compare & Data Synchronization
-- exec [DestinationServerInstance].DestinationDB.dbo.stp_DTS_DataSynchronization 'SourceServerInstance','SourceDBName','TableName1,TableName2,TableName3',0
-- Data Synchronization Only
-- exec [DestinationServerInstance].DestinationDB.dbo.stp_DTS_DataSynchronization 'SourceServerInstance','SourceDBName','TableName1,TableName2,TableName3',1
*/
if exists (select * from dbo.sysobjects where id = object_id(N'[dbo].[stp_DTS_DataSynchronization]') and OBJECTPROPERTY(id, N'IsProcedure') = 1)
drop procedure [dbo].[stp_DTS_DataSynchronization]
GO
create procedure stp_DTS_DataSynchronization
(
@p_LinkedServerNameofSourceDatabase varchar(254)
,@p_SourceDatabaseName varchar(254)
,@p_TablesToBeSynchronized varchar(max)
,@p_ProgramMode int = 1
)
as
begin
set nocount on
declare
@v_TableName varchar(254)
,@i_CommaPosition int
,@i_StringLength int
,@i_StartPosition int
,@i_EndPosition int
,@v_sql nvarchar(max)
,@v_ColumnList varchar(max)
,@v_ColumnlistWithoutTimeStampDataType varchar(max)
,@v_ColumnStringWithCastingLOBToLargeValueDataType varchar(max)
,@v_ColumnStringWithCastingLargeValueToLOBDataType varchar(max)
,@v_EquiJoinString varchar(max)
,@v_StructureString varchar(max)
,@v_DeletedString varchar(max)
,@v_InsertedString varchar(max)
,@v_TmpTableColumnListString varchar(max)
,@v_IdentityOrPrimaryKeyColumnString varchar(max)
,@v_SetDestinationColumnWithSourceColumnString varchar(max)
,@b_IsTableHavingIdentityColumn bit
,@i_errorcode int
,@v_ErrorMessage varchar(400)
,@i_return int
,@v_newline varchar(1)
,@v_DetailedLog varchar(max)
,@i_DeletedRowCount int
,@i_InsertedRowCount int
,@i_UpdatedRowCount int
,@i_TotalDeletedRowCount int
,@i_TotalInsertedRowCount int
,@i_TotalUpdatedRowCount int
,@i_TotalRowsSynchronized int
,@v_Line varchar(80)
,@v_Line1 varchar(80)
,@i_LogColumnWidth int
,@d_StartDate datetime
,@d_EndDate datetime
,@v_FullyQualifiedSourceTableName varchar(254)
,@v_FullyQualifiedDestinationTableName varchar(254)
,@v_LinkedServerNameofDestinationDatabase varchar(254)
,@v_DestinationDatabaseName varchar(254)
,@v_FullyQualifiedSourceDatabaseName varchar(254)
,@v_FullyQualifiedDestinationDatabaseName varchar(254)
,@b_SynchronizationTransactionStarted bit
-- Initialize local variables
select
@i_errorcode = 0
,@i_return = 0
,@v_newline = char(10)
,@i_TotalDeletedRowCount = 0
,@i_TotalInsertedRowCount = 0
,@i_TotalUpdatedRowCount = 0
,@i_TotalRowsSynchronized = 0
,@v_Line = replicate('*',80)
,@v_Line1 = replicate('-',32)
,@i_LogColumnWidth = 30
,@d_StartDate = getdate()
,@v_LinkedServerNameofDestinationDatabase = @@servername
,@v_DestinationDatabaseName = db_name()
,@b_SynchronizationTransactionStarted = 0
-- Derive the fully qualified name of the Database
select
@v_FullyQualifiedSourceDatabaseName = '[' + @p_LinkedServerNameofSourceDatabase + '].['+ @p_SourceDatabaseName + ']' + '.dbo.'
,@v_FullyQualifiedDestinationDatabaseName = '[' + @v_LinkedServerNameofDestinationDatabase + '].['+ @v_DestinationDatabaseName + ']' + '.dbo.'
-- start synchronization transaction only if the program mode is "Table Compare & Data Synchronization" or "Data Synchronization only"
if @p_ProgramMode >= 0
begin
-- Start Synchronization transaction
begin transaction Synchronization
set @b_SynchronizationTransactionStarted = 1
end
-- Loop through all the TableName from the 'Comma seperated List of Tables to be synchronized' input parameter
while 1=1
begin
select
@i_CommaPosition = patindex('%,%',@p_TablesToBeSynchronized)
,@i_StringLength = len(@p_TablesToBeSynchronized)
,@i_StartPosition = 1
select
@i_endPosition = case when @i_CommaPosition = 0 then @i_StringLength else @i_CommaPosition - 1 end
-- Derive the Table Name to be synchronized
select
@v_TableName = substring(@p_TablesToBeSynchronized,@i_StartPosition,@i_endPosition)
if @v_TableName is not null
begin
-- Derive the column list
exec stp_DTS_GetCommaSeperatedColumnString
@v_TableName
,@v_ColumnList out
,@v_ColumnlistWithoutTimeStampDataType out
,@v_ColumnStringWithCastingLOBToLargeValueDataType out
,@v_ColumnStringWithCastingLargeValueToLOBDataType out
-- Perform Table Compare
if @p_ProgramMode <= 0
begin
select
@v_FullyQualifiedSourceTableName = @v_FullyQualifiedSourceDatabaseName + @v_TableName
,@v_FullyQualifiedDestinationTableName = @v_FullyQualifiedDestinationDatabaseName + @v_TableName
set @v_sql = 'select ''' + @v_FullyQualifiedSourceTableName + ''' as TableName, ' + @v_ColumnStringWithCastingLOBToLargeValueDataType +
' from ' + @v_FullyQualifiedSourceTableName + ' union all select ''' + @v_FullyQualifiedDestinationTableName + ''' as TableName, ' +
@v_ColumnStringWithCastingLOBToLargeValueDataType + ' from ' + @v_FullyQualifiedDestinationTableName
set @v_sql = 'select max(TableName) as [TableName : ' + @v_TableName + '], ' + @v_ColumnStringWithCastingLargeValueToLOBDataType +
' from (' + @v_sql + ') a group by ' + @v_ColumnList +
' having count(*) = 1'
exec sp_executesql @v_sql
-- If an error encountered then go to error handler for rollback the transaction
select
@i_errorcode = @@error
if @i_errorcode != 0
goto ErrorHandler
-- Jump to next iteration if the program mode is Table compare only
if @p_ProgramMode < 0
goto NextIteration
end
-- Populate the Identity Or Primary Key Column Details
exec stp_DTS_GetIdentityOrPrimaryKeyColumnDetails
@v_TableName
,@v_EquiJoinString OUT
,@v_StructureString OUT
,@v_DeletedString OUT
,@v_InsertedString OUT
,@v_TmpTableColumnListString OUT
,@v_IdentityOrPrimaryKeyColumnString out
,@b_IsTableHavingIdentityColumn out
-- Step 1.Records to be deleted from Destination database table
--Select the records that does not exist in Source database table, but exist in the Destination database table
--Then delete them from the Destination database table
-- Start constructing dynamic sql required for Step 1.
set @v_sql ='delete from' + ' '
+ @v_TableName + ' '
+ 'where not exists' + ' '
+ '(select 1 from' + ' '
+ @v_FullyQualifiedSourceDatabaseName + @v_TableName + ' '
+ 'as SrcDBTable' + ' '
+ 'where' + ' '
+ @v_EquiJoinString + ' '
+ ')'
-- Execute the constructed dynamic sql for Step 1
exec sp_executesql @v_sql
-- If an error encountered then go to error handler for rollback the transaction
select
@i_errorcode = @@error
if @i_errorcode != 0
goto ErrorHandler
-- Step 2.Records to be inserted into Destination database table
--Select the records that are exist in the Source database table, but does not exist in the Destination database table
--Then insert them into the Destination database table
-- Start constructing dynamic sql required for Step 2.
-- 'set identity_insert on' before the insert statement if the table has an Identity column
if @b_IsTableHavingIdentityColumn = 1
set @v_sql = 'set identity_insert' + ' '
+ @v_TableName + ' ' +
+ 'on'
else
set @v_sql = ''
set @v_sql =@v_sql + ' '
+ 'insert into' + ' '
+ @v_TableName + ' '
+ '(' + @v_ColumnlistWithoutTimeStampDataType + ')' + ' '
+ 'select' + ' '
+ @v_ColumnlistWithoutTimeStampDataType + ' '
+ 'from' + ' '
+ @v_FullyQualifiedSourceDatabaseName + @v_TableName + ' '
+ 'as SrcDBTable' + ' '
+ 'where not exists ' + ' '
+ '(select 1 from' + ' '
+ @v_TableName + ' '
+ 'where' + ' '
+ @v_EquiJoinString + ' '
+ ')'
-- ''set identity_insert off' after the insert statement if the table has an Identity column
if @b_IsTableHavingIdentityColumn = 1
set @v_sql = @v_sql + ' '
+ 'set identity_insert' + ' '
+ @v_TableName + ' ' +
+ 'off'
-- Execute the constructed dynamic sql for Step 2
exec sp_executesql @v_sql
-- If an error encountered then go to error handler for rollback the transaction
select
@i_errorcode = @@error
if @i_errorcode != 0
goto ErrorHandler
-- Step 3.Records to be updated in the Destination database table
--Select the records that are differ from Source & Destination database table
--Then update them in the Destination database table with the source database table data
-- Populate the set string
exec stp_DTS_SetDestinationColumnWithSourceColumnString
@v_TableName
,@v_SetDestinationColumnWithSourceColumnString out
-- if the table does not have a non key column then no need to do this step 3.
if isnull(@v_SetDestinationColumnWithSourceColumnString,'') = ''
begin
set @i_UpdatedRowCount = 0
goto NextIteration
end
-- Start constructing dynamic sql required for Step 3.
set @v_sql ='update' + ' '
+ @v_TableName + ' '
+ 'set' + ' '
+ @v_SetDestinationColumnWithSourceColumnString + ' '
+ 'from' + ' '
+ @v_TableName + ' '
+',' + ' '
+ '(select' + ' '
+ 'max(TableName) as TableName,' + ' '
+ @v_ColumnStringWithCastingLargeValueToLOBDataType + ' '
+ 'from' + ' '
+ '(' + ' '
+ 'select' + ' '
+ '''SourceDBTableName'' as TableName,' + ' '
+ @v_ColumnStringWithCastingLOBToLargeValueDataType + ' '
+ 'from' + ' '
+ @v_FullyQualifiedSourceDatabaseName + @v_TableName + ' '
+ 'union all' + ' '
+ 'select' + ' '
+ '''DestinationiTableName'' as TableName,' + ' '
+ @v_ColumnStringWithCastingLOBToLargeValueDataType + ' '
+ 'from' + ' '
+ @v_TableName + ' '
+ ') tmp' + ' '
+ 'group by' + ' '
+ @v_ColumnList + ' '
+ 'having' + ' '
+ 'count(*) = 1 and max(TableName) = ''SourceDBTableName''' + ' '
+ ')' + ' '
+ 'as SrcDBTable' + ' '
+ 'where' + ' '
+ @v_EquiJoinString
-- Execute the constructed dynamic sql for Step 3
exec sp_executesql @v_sql
-- If an error encountered then go to error handler for rollback the transaction
select
@i_errorcode = @@error
if @i_errorcode != 0
goto ErrorHandler
end
-- Derive the next iteration values
-- If no further iternation is required then exit the loop
NextIteration:
if @i_CommaPosition = 0
begin
-- Break the loop if no more tables exist to synchronize
break
end
else
begin
-- Derive the remaining Tables to be synchronized
select
@p_TablesToBeSynchronized = substring(@p_TablesToBeSynchronized, @i_CommaPosition + 1, @i_StringLength)
end
end
-- If the program mode is "Table Compare & Data Synchronization" or "Data Synchronization only"
-- then commit the synchronization transaction and log the remaining
if @p_ProgramMode >= 0
begin
-- Commit the synchronization transaction as no error occured
commit transaction Synchronization
end
-- Return success
return 0
-- Error Handler
ErrorHandler:
begin
-- check for error
if ( @i_errorcode <= 0 )
begin
raiserror 99999 @v_ErrorMessage
select @i_return = -100
end
else if ( @i_errorcode > 0 )
begin
select @i_return = -100
end
-- Rollback the synchronization transaction if it is started and an error occured
if @b_SynchronizationTransactionStarted = 1
rollback transaction Synchronization
-- Return Failure
return @i_return
end
end
go
View 4 Replies
View Related