Transaction Log Tracking Inserts Updates and Deletes Over Time
Published: May 27, 2016
Have you ever wanted something which will reliably keep you informed as to the number or Inserts, Updates or Deletes against table in your system? Maybe to keep an eye on the volume of modifications happening in a replicated environment?

If you put some effort in then this is something that can be done via the Transaction Log.

One thing I would note though, is that reading the transaction log can be heavy on a system and slow down things such as log backups so I wouldn’t recommend this as a permanent tracker by any means.

However, if you want to check on your system over a specific period, or you have performance to spare and simply want to profile for a few days to get a baseline or spot patterns… then this could help you out.

Effectively this builds on my last post in which I showed how to see Inserts, Updates, and Deletes using the transaction log of your database. However, what I’ve done now is to extend that a little in order to track these over time whilst doing my best to avoid any real impact to the server.

This will allow you to profile replication volumes, spot loading or update patterns, or simply track down rogue data changes you didn’t even know were happening.

The easiest way I can think to present this is simply by offering the code with a good deal of annotations rather than trying to explain what’s going on and then providing the code separately…

Note that this provides the framework… you would have to place this into a scheduled task in order to profile data changes over time…

/*
   I'm using a test database called testDB...  use your own database as required.
   But note that I use "testDB.dbo." further in the script so you'll need a find
   and replace.
*/
use testDB
go

/*
   Create a LogTrack table.
   The purpose of this is to log the max LSN we encounter when reading the log.
   This means that subsequent reads of the log can be massively restricted so that
   we don't duplicate data and also so that we read a small portion of the log,
   therefore reducing the impact of fn_dbLog
*/

if object_id('dbo.transactionLogTrack') is null
begin
   create table
dbo.transactionLogTrack
  
(
      
databaseName varchar(100),
      
lsn varchar(250),
      
endDate datetime
  
)
end

/*
   This is the main reporting table which collates the transaction activity
   in the database and allows for reporting.
*/

if object_id('dbo.transactionActivity') is null
begin
   create table
dbo.transactionActivity
  
(
      
databaseName varchar(50),
      
tableName varchar(100),
      
hourFrom smalldatetime,
      
hourTo smalldatetime,
      
-- heap or clustered index changes logged separately
      
tableInserted int,
      
tableUpdated int,
      
tableDeleted int,
      
-- changes to nonclustered indexes
      
indexInserted int,
      
indexUpdated int,
      
indexDeleted int,
      
-- total changes by type
      
rowsInserted int,
      
rowsUpdated int,
      
rowsDeleted int,
      
-- total changes to the table
      
totalChanges int,
      
-- is the table in replication or not
      
isReplicated bit
  
)
end

-- This is the database I want to track
use AdventureWorks2012
go

declare @lsnStart varchar(250)

/*
Look for the last LSN read for this database
This limits the impact on the transaction log
*/
select @lsnStart = lsn
from testDB.dbo.transactionLogTrack
where databaseName = db_name()

/*
   First time this is run we need to insert dummy values
   so that the whole transaction log is read this time
*/
if @lsnStart is null
begin
   insert into
testDB.dbo.transactionLogTrack
  
select db_name(), '0', '2000-01-01'
end

if
@lsnStart = '0'
  
select @lsnStart = null

/*
   Collect transaction log data for the specific items we're looking for,
   only going as far back as the last read LSN to reduce load
*/
declare @partialLog table
(
  
lsn varchar(50),
  
tranID varchar(50),
  
endTime smalldatetime,
  
allocUnitID bigint,
  
operation varchar(250),
  
context varchar(250)
)
insert into @partialLog
select [Current LSN], [transaction ID] tranID, [end time] endTime, AllocUnitId, operation, Context
from ::fn_dbLog(@lsnStart, null) -- here we speficy how far back in the log we want to look
where (operation in ('LOP_INSERT_ROWS', 'LOP_MODIFY_ROW', 'LOP_DELETE_ROWS')
   and
context not in ('LCX_PFS', 'LCX_IAM'))
or
operation = 'LOP_COMMIT_XACT'

/*
Collate the records into a temp table
*/
declare @logRecords table
(
  
databaseName varchar(100),
  
tableName varchar(250),
  
hourFrom smalldatetime,
  
hourTo as dateadd(hh, 1, hourFrom),
  
tableInserted int,
  
tableUpdated int,
  
tableDeleted int,
  
indexInserted int,
  
indexUpdated int,
  
indexDeleted int,
  
-- no point doing totals separately so we're using calculated columns
  
totalInserted as tableInserted + indexInserted,
  
totalUpdated as tableUpdated + indexUpdated,
  
totalDeleted as tableDeleted + indexDeleted,
  
totalChanges as tableInserted + indexInserted + tableUpdated + indexUpdated + tableDeleted + indexDeleted,
  
isReplicated bit default(0)
)
;
with splitResult as
(
  
select db_name() databaseName, dateadd(hh, datepart(hh, x.endTime), convert(varchar, left(x.endTime, 11))) endTime, o.name objectName,
          
-- if index_id is 0 or 1 then it's the table itself (Heap or clustered)
          
case when i.index_id in (0, 1) and operation = 'LOP_INSERT_ROWS' then 1 else 0 end tableInsert,
          
case when i.index_id in (0, 1) and operation = 'LOP_MODIFY_ROW' then 1 else 0 end tableUpdate,
          
case when ii.index_id in (0, 1) and operation = 'LOP_DELETE_ROWS' then 1 else 0 end tableDelete,
          
-- an index_id > 1 is a nonclustered index
          
case when i.index_id > 1 and operation = 'LOP_INSERT_ROWS' then 1 else 0 end indexInsert,
          
case when i.index_id > 1 and operation = 'LOP_MODIFY_ROW' then 1 else 0 end indexUpdate,
          
case when i.index_id > 1 and operation = 'LOP_DELETE_ROWS' then 1 else 0 end indexDelete
  
from @partialLog t
  
join sys.system_internals_allocation_units siau
  
on t.AllocUnitId = siau.allocation_unit_id
  
join sys.partitions p
  
on siau.container_id = p.partition_id
  
join sys.indexes i
  
on p.index_id = i.index_id
  
and p.object_id = i.object_id
  
join sys.objects o
  
on p.object_id = o.object_id
  
-- cross apply to get the transaction end date applied to all records
  
cross apply
  
(
      
select endTime
      
from @partialLog
      
where tranID = t.tranID
      
and endTime is not null
   )
x
  
where o.is_ms_shipped = 0
  
and t.endTime is null
),
addTotals as
(
  
-- collate into totals
  
select databaseName, endTime hourFrom, objectName,
          
sum(tableInsert) tableInserts, sum(tableUpdate) tableUpdates, sum(tableDelete) tableDeletes,
          
sum(indexInsert) indexInserts, sum(indexUpdate) indexUpdates, sum(indexDelete) indexDeletes
  
from splitResult
  
group by databaseName, endTime, objectName
)
insert into @logRecords(databaseName, tableName, hourFrom, tableInserted, tableDeleted, tableUpdated, indexInserted, indexUpdated, indexDeleted)
select databaseName, objectName, hourFrom, tableInserts, tableDeletes, tableUpdates, indexInserts, indexUpdates, indexDeletes
from addTotals

declare @endTime smalldatetime, @lsn varchar(50)

select @lsn = max(lsn), @endTime = max(endTime)
from @partialLog

-- format the LSN ready to insert into the LogTrack table
select @lsn = cast(cast(convert(varbinary, left(@lsn, 8), 2) as int) as varchar) +
  
right('0000000000' + cast(cast(convert(varbinary, substring(@lsn, 10, 8), 2) as int) as varchar), 10) +
  
right('00000' + cast(cast(convert(varbinary, right(@lsn, 4), 2) as int) as varchar), 5)

-- insert our start LSN for next run into the log track table
update testDB.dbo.transactionLogTrack
set lsn = isnull(@lsn, 0), endDate = isnull(@endTime, '2001-01-01')
where databaseName = db_name()

-- check to see if this database is replicated by checking for sysarticles
if exists
(
  
select *
  
from sys.objects
  
where name = 'sysarticles'
)
begin
-- if replicated then flag the relevant tables
  
update n
  
set isReplicated = 1
  
from @logRecords n
  
join sysarticles a
  
on n.tableName = a.name
  
and n.hourFrom >=
   (
      
select min(hourFrom)
      
from @logRecords
  
)
  
where isReplicated = 0
end
;
-- merge the overall results into our tracking table
merge testDB.dbo.transactionActivity t
using
@logRecords l
on
(
  
t.databaseName = l.databaseName
  
and t.tableName = l.tableName
  
and t.hourFrom = l.hourFrom
)
when matched then
   update
       set
t.tableInserted = t.tableInserted + l.tableInserted,
          
t.tableUpdated = t.tableUpdated + l.tableUpdated,
          
t.tableDeleted = t.tableDeleted + l.tableDeleted,
          
t.indexInserted = t.indexInserted + l.indexInserted,
          
t.indexUpdated = t.indexUpdated + l.indexUpdated,
          
t.indexDeleted = t.indexDeleted + l.indexDeleted,
          
t.isReplicated = l.isReplicated
when not matched then
   insert
(databaseName, tableName, hourFrom, hourTo, tableInserted, tableUpdated, tableDeleted, indexInserted, indexUpdated, indexDeleted, rowsInserted, rowsUpdated, rowsDeleted, totalChanges, isReplicated)
  
values (databaseName, tableName, hourFrom, hourTo, tableInserted, tableUpdated, tableDeleted, indexInserted, indexUpdated, indexDeleted, totalInserted, totalUpdated, totalDeleted, totalChanges, isReplicated)
;


If, for any reason, you want to place this in a job and therefore need it to run across all databases, then that can be found below…

use testDB
go

if object_id('dbo.transactionLogTrack') is null
begin
   create table
dbo.transactionLogTrack
  
(
      
databaseName varchar(100),
      
lsn varchar(250),
      
endDate datetime
  
)
end

if
object_id('dbo.transactionActivity') is null
begin
   create table
dbo.transactionActivity
  
(
      
databaseName varchar(50),
      
tableName varchar(100),
      
hourFrom smalldatetime,
      
hourTo smalldatetime,
      
tableInserted int,
      
tableUpdated int,
      
tableDeleted int,
      
indexInserted int,
      
indexUpdated int,
      
indexDeleted int,
      
rowsInserted int,
      
rowsUpdated int,
      
rowsDeleted int,
      
totalChanges int,
      
isReplicated bit
  
)
end

declare
@databaseList table
(
  
id tinyint identity(1, 1) not null,
  
databaseName varchar(50) not null
)
insert into @databaseList
select name
from sys.databases
where name not in ('master', 'msdb', 'model', 'tempDB', 'distribution', 'ReportServer', 'ReportServerTempDB')
and
is_read_only = 0
-- note this requires databases to be in the server collation.
and collation_name = serverproperty('Collation')
-- add other criteria as appropriate

declare @counter tinyint = 1, @sql nvarchar(max)

set nocount on

while
@counter <= (select max(id) from @databaseList)
begin
   select
@sql = 'use ' + databaseName + '
declare @lsnStart varchar(250)

select @lsnStart = lsn
from testDB.dbo.transactionLogTrack
where databaseName = db_name()

if @lsnStart is null
begin
insert into testDB.dbo.transactionLogTrack
select db_name(), ''0'', ''2000-01-01''
end

if @lsnStart = ''0''
select @lsnStart = null

declare @partialLog table
(
lsn varchar(50),
tranID varchar(50),
endTime smalldatetime,
allocUnitID bigint,
operation varchar(250),
context varchar(250)
)
insert into @partialLog
select [Current LSN], [transaction ID] tranID, [end time] endTime, AllocUnitId, operation, Context
from ::fn_dbLog(null, null)
where (operation in (''LOP_INSERT_ROWS'', ''LOP_MODIFY_ROW'', ''LOP_DELETE_ROWS'')
and context not in (''LCX_PFS'', ''LCX_IAM''))
or operation = ''LOP_COMMIT_XACT''

declare @logRecords table
(
databaseName varchar(100),
tableName varchar(250),
hourFrom smalldatetime,
hourTo as dateadd(hh, 1, hourFrom),
tableInserted int,
tableUpdated int,
tableDeleted int,
indexInserted int,
indexUpdated int,
indexDeleted int,
totalInserted as tableInserted + indexInserted,
totalUpdated as tableUpdated + indexUpdated,
totalDeleted as tableDeleted + indexDeleted,
totalChanges as tableInserted + indexInserted + tableUpdated + indexUpdated + tableDeleted + indexDeleted,
isReplicated bit default(0)
)
;
with splitResult as
(
select db_name() databaseName, dateadd(hh, datepart(hh, x.endTime), convert(varchar, left(x.endTime, 11))) endTime, o.name objectName,
case when i.index_id in (0, 1) and operation = ''LOP_INSERT_ROWS'' then 1 else 0 end tableInsert,
case when i.index_id in (0, 1) and operation = ''LOP_MODIFY_ROW'' then 1 else 0 end tableUpdate,
case when i.index_id in (0, 1) and operation = ''LOP_DELETE_ROWS'' then 1 else 0 end tableDelete,
case when i.index_id > 1 and operation = ''LOP_INSERT_ROWS'' then 1 else 0 end indexInsert,
case when i.index_id > 1 and operation = ''LOP_MODIFY_ROW'' then 1 else 0 end indexUpdate,
case when i.index_id > 1 and operation = ''LOP_DELETE_ROWS'' then 1 else 0 end indexDelete
from @partialLog t
join sys.system_internals_allocation_units siau
on t.AllocUnitId = siau.allocation_unit_id
join sys.partitions p
on siau.container_id = p.partition_id
join sys.indexes i
on p.index_id = i.index_id
and p.object_id = i.object_id
join sys.objects o
on p.object_id = o.object_id
cross apply
(
select endTime
from @partialLog
where tranID = t.tranID
and endTime is not null
) x
where o.is_ms_shipped = 0
and t.endTime is null
), addTotals as
(
select databaseName, end'
dTime hourFrom, objectName,
          
sum(tableInsert) tableInserts, sum(tableUpdate) tableUpdates, sum(tableDelete) tableDeletes,
          
sum(indexInsert) indexInserts, sum(indexUpdate) indexUpdates, sum(indexDelete) indexDeletes
  
from splitResult
  
group by databaseName, endTime, objectName
)
insert into @logRecords(databaseName, tableName, hourFrom, tableInserted, tableDeleted, tableUpdated, indexInserted, indexUpdated, indexDeleted)
select databaseName, objectName, hourFrom, tableInserts, tableDeletes, tableUpdates, indexInserts, indexUpdates, indexDeletes
from addTotals

declare @endTime smalldatetime, @lsn varchar(50)

select @lsn = max(lsn), @endTime = max(endTime)
from @partialLog

-- format the LSN ready to insert into the LogTrack table
select @lsn = cast(cast(convert(varbinary, left(@lsn, 8), 2) as int) as varchar) +
  
right(''0000000000'' + cast(cast(convert(varbinary, substring(@lsn, 10, 8), 2) as int) as varchar), 10) +
  
right(''00000'' + cast(cast(convert(varbinary, right(@lsn, 4), 2) as int) as varchar), 5)

update testDB.dbo.transactionLogTrack
set lsn = isnull(@lsn, 0), endDate = isnull(@endTime, ''2001-01-01'')
where databaseName = db_name()

if exists
(
  
select *
  
from sys.objects
  
where name = ''sysarticles''
)
begin
   update
n
  
set isReplicated = 1
  
from @logRecords n
  
join sysarticles a
  
on n.tableName = a.name
  
and n.hourFrom >=
   (
      
select min(hourFrom)
      
from @logRecords
  
)
  
where isReplicated = 0
end
;
merge testDB.dbo.transactionActivity t
using
@logRecords l
on
(
  
t.databaseName = l.databaseName
  
and t.tableName = l.tableName
  
and t.hourFrom = l.hourFrom
)
when matched then
   update
       set
t.tableInserted = t.tableInserted + l.tableInserted,
          
t.tableUpdated = t.tableUpdated + l.tableUpdated,
          
t.tableDeleted = t.tableDeleted + l.tableDeleted,
          
t.indexInserted = t.indexInserted + l.indexInserted,
          
t.indexUpdated = t.indexUpdated + l.indexUpdated,
          
t.indexDeleted = t.indexDeleted + l.indexDeleted,
          
t.isReplicated = l.isReplicated
when not matched then
   insert
(databaseName, tableName, hourFrom, hourTo, tableInserted, tableUpdated, tableDeleted, indexInserted, indexUpdated, indexDeleted, rowsInserted, rowsUpdated, rowsDeleted, totalChanges, isReplicated)
  
values (databaseName, tableName, hourFrom, hourTo, tableInserted, tableUpdated, tableDeleted, indexInserted, indexUpdated, indexDeleted, totalInserted, totalUpdated, totalDeleted, totalChanges, isReplicated)
;
'
from @databaseList
where id = @counter

exec sp_executeSQL @sql

select @counter += 1
end'
Leave a Comment
Your email address will not be published. All fields are mandatory.
NB: Comments will only appear once they have been moderated.

Your SQL Trainer
Kevin Urquhart

I am a SQL Server DBA, Architect, Developer, and Trainer for SQL Training. This is my blog in which I’m simply trying to share my SQL knowledge and experiences with the world.

Kevin Urquhart
SQL Server Consultant, London

Categories


Archives


Copyright © 2018 London SQL Server Training Courses @ SQL Training | Website Design From Scott Heron