文章转载于: https://www.cnblogs.com/marklove/p/9206838.html
AtomicCmpExchange(Target, Exchange, Comparand)函数的作用
Target 与 Comparand 值进行比较,如果两个相等则把Exchange赋值给Target。函数返回为Target传入之前的值。
高效读写锁
一个高效读写锁,可实现多个线程读一个线程写的锁,应该比Delphi自带的读写锁高效,本人没有做对比测试。
本文的锁不可以在一个线程里重入,否则会锁死,另外读写锁最多支持65535个线程同时读。
// 跨平台简易高效锁
unit utLocker;
interface
type
// 多读单写锁
// 1.写的时候阻塞其他所有写和读
// 2.读的时候不阻塞其他读,但阻塞所有写,当阻塞了一个或以上的写后,将阻塞所有后来新的读
TMultiReadSingleWriteLocker = class
protected
[Volatile]
FLocker: Integer;
public
procedure LockRead;
procedure UnLockRead; inline;
procedure LockWrite;
procedure UnLockWrite; inline;
function TryLockRead: Boolean; inline;
function TryLockWrite: Boolean; inline;
constructor Create;
end;
TSimpleLocker = class
protected
[Volatile]
FLocker: Integer;
public
procedure Lock;
procedure UnLock; inline;
function TryLock: Boolean; inline;
end;
implementation
uses System.SyncObjs, System.SysUtils, System.Classes;
type
TSpinWait = record
private const
YieldThreshold = 10;
Sleep1Threshold = 20;
Sleep0Threshold = 5;
private
FCount: Integer;
function GetNextSpinCycleWillYield: Boolean; inline;
public
procedure Reset;inline;
procedure SpinCycle;inline;
property Count: Integer read FCount;
property NextSpinCycleWillYield: Boolean read GetNextSpinCycleWillYield;
end;
{ TSpinWait }
function TSpinWait.GetNextSpinCycleWillYield: Boolean;
begin
Result := (FCount > YieldThreshold) or (CPUCount = 1);
end;
procedure TSpinWait.Reset;
begin
FCount := 0;
end;
procedure TSpinWait.SpinCycle;
var
SpinCount: Integer;
begin
if NextSpinCycleWillYield then
begin
if FCount >= YieldThreshold then
SpinCount := FCount - YieldThreshold
else
SpinCount := FCount;
if SpinCount mod Sleep1Threshold = Sleep1Threshold - 1 then
TThread.Sleep(1)
else if SpinCount mod Sleep0Threshold = Sleep0Threshold - 1 then
TThread.Sleep(0)
else
TThread.Yield;
end
else
TThread.SpinWait(4 shl FCount);
Inc(FCount);
if FCount < 0 then
FCount := YieldThreshold + 1;
end;
{ TMultiReadSingleWriteLocker }
procedure TMultiReadSingleWriteLocker.LockRead;
var
CurLock: Integer;
Wait: TSpinWait;
begin
Wait.Reset;
while True do
begin
CurLock := FLocker;
if CurLock <= $FFFF then
begin
if TInterlocked.CompareExchange(FLocker, CurLock + 1, CurLock) = CurLock
then
Exit;
end;
Wait.SpinCycle;
end;
end;
procedure TMultiReadSingleWriteLocker.LockWrite;
var
CurLock: Integer;
Wait: TSpinWait;
begin
Wait.Reset;
while True do
begin
CurLock := FLocker;
if CurLock <= $FFFF then
begin
if TInterlocked.CompareExchange(FLocker, CurLock + $10000, CurLock) = CurLock
then
Exit;
end;
Wait.SpinCycle;
end;
end;
function TMultiReadSingleWriteLocker.TryLockRead: Boolean;
var
CurLock: Integer;
begin
CurLock := FLocker;
if CurLock <= $FFFF then
Result := TInterlocked.CompareExchange(FLocker, CurLock + 1, CurLock)
= CurLock
else
Result := False;
end;
function TMultiReadSingleWriteLocker.TryLockWrite: Boolean;
var
CurLock: Integer;
begin
CurLock := FLocker;
if CurLock <= $FFFF then
Result := TInterlocked.CompareExchange(FLocker, CurLock + $10000, CurLock)
= CurLock
else
Result := False;
end;
procedure TMultiReadSingleWriteLocker.UnLockWrite;
begin
if FLocker < $10000 then
raise Exception.Create('TMultiReadSingleWriteLocker Error');
TInterlocked.Add(FLocker, -$10000);
end;
procedure TMultiReadSingleWriteLocker.UnLockRead;
begin
TInterlocked.Decrement(FLocker);
end;
constructor TMultiReadSingleWriteLocker.Create;
begin
FLocker := 0;
end;
{ TSimpleLocker }
procedure TSimpleLocker.Lock;
var
Wait: TSpinWait;
begin
Wait.Reset;
while True do
begin
if FLocker = 0 then
begin
if TInterlocked.CompareExchange(FLocker, 1, 0) = 0 then
Exit;
end;
Wait.SpinCycle;
end;
end;
function TSimpleLocker.TryLock: Boolean;
begin
if FLocker = 0 then
begin
Result := TInterlocked.CompareExchange(FLocker, 1, 0) = 0;
end
else
Result := False;
end;
procedure TSimpleLocker.UnLock;
begin
if TInterlocked.CompareExchange(FLocker, 0, 1) <> 1 then
raise Exception.Create('TSimpleLocker Error');
end;
end.
一个简易无锁池
一个简易 无锁池
1.所有读写无等待,不需要判断条件直接读写(除自动扩充容量时),效率是一般带锁或带条件判断池的两倍以上。
2.预先开辟2的幂大小容量,可自增,每次翻倍
3.仅提供思路,工程应用可靠性还不确定。
// 无锁池
// 20160228 增加代引用计数器内存块的池,增加编译指令POOLGROW功能,可打开关闭池的自动翻倍增长功能
// 20160225 修正Grow中FWritePtr没有增长Bug
// 20140609 增加Grow临界区,减少等待时间
// 20140608 修正可能存在同时Grow的Bug
unit Iocp.AtomPool;
interface
{ .$DEFINE POOLGROW }
Uses
System.SysUtils,
System.SyncObjs;
Type
Int32 = Integer;
UInt32 = Cardinal;
TAtomPoolAbstract = class
private
FWritePtr: Int32;
FReadPtr: Int32;
FHighBound: UInt32;
FData: array of Pointer;
{$IFDEF POOLGROW}
FCs: TCriticalSection;
FLock: Int32;
procedure CheckGrow; inline;
procedure Grow; inline;
{$ENDIF}
Protected
function AllocItemResource: Pointer; virtual; abstract;
procedure FreeItemResource(Item: Pointer); virtual; abstract;
function GetCapacity: UInt32;
procedure FreeResources;
Public
procedure AllocResources;
function Get: Pointer;
procedure Put(Item: Pointer);
Constructor Create(Capacity: UInt32); Virtual;
Destructor Destroy; Override;
property Capacity: UInt32 read GetCapacity;
End;
TAtomPoolMem4K = class(TAtomPoolAbstract)
function AllocItemResource: Pointer; override;
procedure FreeItemResource(Item: Pointer); override;
end;
// 内存块带引用计数器的池,池容量恒定不能增长
TAtomMemoryPoolRef = class
private
FMemory: PByteArray;
FWritePtr: Int32;
FReadPtr: Int32;
FHighBound: UInt32;
FMemSize: UInt32;
FData: array of Pointer;
FDataRef: array of Int32;
Protected
function GetCapacity: UInt32;
procedure AllocResources;
procedure FreeResources;
Public
function Get: Pointer;
procedure Put(Item: Pointer);
function IncRef(Item: Pointer): Int32;
function DecRef(var Item: Pointer): Int32;
Constructor Create(Capacity: UInt32; MemSize: UInt32);
Destructor Destroy; Override;
property Capacity: UInt32 read GetCapacity;
property MemSize:UInt32 read FMemSize;
End;
Implementation
const
MAXTHREADCOUNT = 1000; // 从池中申请资源最大线程数
// 创建池,大小必须是2的幂,并且必须大于MAXTHREADCOUNT
Constructor TAtomPoolAbstract.Create(Capacity: UInt32);
var
OK: Boolean;
Begin
Inherited Create;
OK := (Capacity and (Capacity - 1) = 0);
OK := OK and (Capacity > MAXTHREADCOUNT);
if not OK then
raise Exception.Create(Format('池长度必须大于%d并为2的幂', [MAXTHREADCOUNT]));
{$IFDEF POOLGROW}
FCs := TCriticalSection.Create;
{$ENDIF}
FHighBound := Capacity - 1;
FReadPtr := 0;
End;
Destructor TAtomPoolAbstract.Destroy;
Begin
FreeResources;
SetLength(FData, 0);
{$IFDEF POOLGROW}
FCs.Free;
{$ENDIF}
Inherited;
End;
procedure TAtomPoolAbstract.AllocResources;
var
i: UInt32;
begin
try
SetLength(FData, Capacity);
for i := 0 to FHighBound do
FData[i] := AllocItemResource;
except
Raise Exception.Create('池申请内存失败');
end;
end;
procedure TAtomPoolAbstract.FreeResources;
var
i: UInt32;
begin
for i := FHighBound downto 0 do
Self.FreeItemResource(FData[i]);
end;
procedure TAtomPoolAbstract.Put(Item: Pointer);
var
N: UInt32;
begin
{$IFDEF POOLGROW}
CheckGrow;
{$ENDIF}
N := TInterlocked.Increment(FWritePtr);
FData[N and FHighBound] := Item;
end;
Function TAtomPoolAbstract.Get: Pointer;
var
{$IFDEF POOLGROW}
N, M, K: UInt32;
{$ELSE}
N: UInt32;
{$ENDIF}
begin
{$IFDEF POOLGROW}
N := FWritePtr and FHighBound;
M := FReadPtr and FHighBound;
K := (M + MAXTHREADCOUNT) and FHighBound;
if (N > M) and (N < K) then
// if ((N > M) and (N < K)) or ((N < M) and (N > K)) then
begin
Grow
end;
{$ENDIF}
N := TInterlocked.Increment(FReadPtr);
Result := FData[N and FHighBound];
end;
function TAtomPoolAbstract.GetCapacity: UInt32;
begin
Result := FHighBound + 1;
end;
{$IFDEF POOLGROW}
procedure TAtomPoolAbstract.CheckGrow;
begin
if TInterlocked.Add(FLock, 0) > 0 then
begin
while FLock = 1 do
Sleep(0);
FCs.Enter;
FCs.Leave;
end;
end;
procedure TAtomPoolAbstract.Grow;
var
i, N: Integer;
begin
if TInterlocked.CompareExchange(FLock, 1, 0) = 0 then // 加锁
begin
FCs.Enter;
TInterlocked.Increment(FLock);
N := Length(FData);
SetLength(FData, N + N);
for i := N to High(FData) do
FData[i] := AllocItemResource;
TInterlocked.Increment(FLock);
FHighBound := High(FData);
FWritePtr := FHighBound;
FCs.Leave;
TInterlocked.Exchange(FLock, 0);
end
else
CheckGrow;
end;
{$ENDIF}
{ TAtomPoolMem4K }
function TAtomPoolMem4K.AllocItemResource: Pointer;
begin
GetMem(Result, 4096);
end;
procedure TAtomPoolMem4K.FreeItemResource(Item: Pointer);
begin
FreeMem(Item, 4096);
end;
Constructor TAtomMemoryPoolRef.Create(Capacity: UInt32; MemSize: UInt32);
var
OK: Boolean;
Begin
Inherited Create;
OK := (Capacity and (Capacity - 1) = 0);
OK := OK and (Capacity > MAXTHREADCOUNT);
if not OK then
raise Exception.Create(Format('池长度必须大于%d并为2的幂', [MAXTHREADCOUNT]));
if FMemSize and $10 <> 0 then
raise Exception.Create('内存块大小必须是16的倍数');
FMemSize := MemSize;
try
AllocResources;
FHighBound := Capacity - 1;
FWritePtr := FHighBound;
FReadPtr := 0;
except
Raise Exception.Create('池申请内存失败');
end;
End;
function TAtomMemoryPoolRef.DecRef(var Item: Pointer): Int32;
var
N: Integer;
begin
N := (NativeUInt(Item) - NativeUInt(FMemory)) div FMemSize;
if (N>=0) and (N<=FHighBound) then
begin
Result := TInterlocked.Decrement(FDataRef[N]);
if Result = 0 then
begin
Put(Item);
Item := nil;
end;
end
else Result:=-1;
end;
Destructor TAtomMemoryPoolRef.Destroy;
Begin
FreeResources;
Inherited;
End;
procedure TAtomMemoryPoolRef.AllocResources;
var
i: UInt32;
P: PByteArray;
begin
SetLength(FData, Capacity);
SetLength(FDataRef, Capacity);
FillChar(FDataRef[0], Capacity * Sizeof(FDataRef[0]), 0);
GetMem(FMemory, Length(FData) * FMemSize); // 一次申请所有内存
P := FMemory;
for i := 0 to FHighBound do
begin
FData[i] := P;
Inc(P, FMemSize);
end;
end;
procedure TAtomMemoryPoolRef.FreeResources;
begin
FreeMem(FMemory, Length(FData) * FMemSize);
SetLength(FData, 0);
SetLength(FDataRef, 0);
end;
procedure TAtomMemoryPoolRef.Put(Item: Pointer);
var
N: UInt32;
begin
N := TInterlocked.Increment(FWritePtr);
FData[N and FHighBound] := Item;
end;
Function TAtomMemoryPoolRef.Get: Pointer;
var
N: UInt32;
begin
N := TInterlocked.Increment(FReadPtr);
Result := FData[N and FHighBound];
end;
function TAtomMemoryPoolRef.GetCapacity: UInt32;
begin
Result := FHighBound + 1;
end;
function TAtomMemoryPoolRef.IncRef(Item: Pointer): Int32;
var
N: Integer;
begin
N := (NativeInt(Item) - NativeInt(FMemory)) div FMemSize;
if (N>=0) and (N<=FHighBound) then
Result := TInterlocked.Increment(FDataRef[N])
else
Result:=-1;
end;
End.
简易高效的Delphi原子队列
Delphi一个基于原子操纵的无锁队列,简略单纯高效。实用于多线程大吞吐量操纵的队列。可用于Android体系和32,64位Windows体系。
有如下题目:
1.必须实现开辟内存
2.队列大小必须是2的幂
3. 不能压入空指针
InterlockedAPIs.inc
{*******************************************************}
{ }
{ CodeGear Delphi Runtime Library }
{ }
{ Copyright(c) 1995-2014 Embarcadero Technologies, Inc. }
{ }
{*******************************************************}
{$IFDEF CPUX86}
function InterlockedAdd(var Addend: Integer; Increment: Integer): Integer;
asm
MOV ECX,EAX
MOV EAX,EDX
LOCK XADD [ECX],EAX
ADD EAX,EDX
end;
function InterlockedCompareExchange(var Target: Integer; Exchange: Integer; Comparand: Integer): Integer;
asm
XCHG EAX,ECX
LOCK CMPXCHG [ECX],EDX
end;
function InterlockedCompareExchangePointer(var Target: Pointer; Exchange: Pointer; Comparand: Pointer): Pointer;
asm
JMP InterlockedCompareExchange
end;
function InterlockedDecrement(var Addend: Integer): Integer;
asm
MOV EDX,-1
JMP InterlockedAdd
end;
function InterlockedExchange(var Target: Integer; Value: Integer): Integer;
asm
MOV ECX,EAX
MOV EAX,[ECX]
@@loop:
LOCK CMPXCHG [ECX],EDX
JNZ @@loop
end;
function InterlockedExchangePointer(var Target: Pointer; Value: Pointer): Pointer;
asm
JMP InterlockedExchange
end;
function InterlockedIncrement(var Addend: Integer): Integer;
asm
MOV EDX,1
JMP InterlockedAdd
end;
{$ENDIF CPUX86}
{$IFDEF CPUX64}
function InterlockedExchangeAdd(var Addend: Integer; Value: Integer): Integer;
asm
.NOFRAME
MOV EAX,EDX
LOCK XADD [RCX].Integer,EAX
end;
function InterlockedDecrement(var Addend: LongInt): LongInt;
asm
.NOFRAME
MOV EAX,-1
LOCK XADD [RCX].Integer,EAX
DEC EAX
end;
function InterlockedIncrement(var Addend: LongInt): LongInt;
asm
MOV EAX,1
LOCK XADD [RCX].Integer,EAX
INC EAX
end;
function InterlockedCompareExchange(var Destination: Integer; Exchange: Integer; Comparand: Integer): Integer;
asm
.NOFRAME
MOV EAX,R8d
LOCK CMPXCHG [RCX].Integer,EDX
end;
function InterlockedCompareExchange64(var Destination: Int64; Exchange: Int64; Comparand: Int64): Int64; overload;
asm
.NOFRAME
MOV RAX,R8
LOCK CMPXCHG [RCX],RDX
end;
function InterlockedCompareExchangePointer(var Destination: Pointer; Exchange: Pointer; Comparand: Pointer): Pointer;
asm
.NOFRAME
MOV RAX,R8
LOCK CMPXCHG [RCX],RDX
end;
function InterlockedExchangePointer(var Target: Pointer; Value: Pointer): Pointer;
asm
.NOFRAME
LOCK XCHG [RCX],RDX
MOV RAX,RDX
end;
function InterlockedExchange(var Target: Integer; Value: Integer): Integer;// inline;
asm
.NOFRAME
LOCK XCHG [RCX],EDX
MOV EAX,EDX
end;
{$ENDIF CPUX64}
{$IFDEF CPUARM}
function InterlockedAdd(var Addend: Integer; Increment: Integer): Integer;
begin
Result := AtomicIncrement(Addend, Increment);
end;
function InterlockedCompareExchange(var Target: Integer; Exchange: Integer; Comparand: Integer): Integer;
begin
Result := AtomicCmpExchange(Target, Exchange, Comparand);
end;
function InterlockedCompareExchangePointer(var Target: Pointer; Exchange: Pointer; Comparand: Pointer): Pointer;
begin
Result := AtomicCmpExchange(Target, Exchange, Comparand);
end;
function InterlockedDecrement(var Addend: Integer): Integer;
begin
Result := AtomicDecrement(Addend);
end;
function InterlockedExchange(var Target: Integer; Value: Integer): Integer;
begin
Result := AtomicExchange(Target, Value);
end;
function InterlockedExchangePointer(var Target: Pointer; Value: Pointer): Pointer;
begin
Result := AtomicExchange(Target, Value);
end;
function InterlockedIncrement(var Addend: Integer): Integer;
begin
Result := AtomicIncrement(Addend);
end;
{$ENDIF CPUARM}
utAtomFIFO.pas
unit utAtomFIFO;
interface
Uses
SysUtils,
SyncObjs;
Type
TAtomFIFO = Class
Protected
FWritePtr: Integer;
FReadPtr: Integer;
FCount:Integer;
FHighBound:Integer;
FisEmpty:Integer;
FData: array of Pointer;
function GetSize:Integer;
Public
procedure Push(Item: Pointer);
function Pop: Pointer;
Constructor Create(Size: Integer); Virtual;
Destructor Destroy; Override;
Procedure Empty;
property Size: Integer read GetSize;
property UsedCount:Integer read FCount;
End;
Implementation
{$I InterlockedAPIs.inc}
//创建队列,大小必须是2的幂,须要开辟足够大的队列,防止队列溢出
Constructor TAtomFIFO.Create(Size: Integer);
var
i:NativeInt;
OK:Boolean;
Begin
Inherited Create;
OK:=(Size and (Size-1)=0);
if not OK then raise Exception.Create('FIFO长度必须大于便是256并为2的幂');
try
SetLength(FData,Size);
FHighBound:=Size-1;
except
Raise Exception.Create('FIFO申请内存失败');
end;
End;
Destructor TAtomFIFO.Destroy;
Begin
SetLength(FData,0);
Inherited;
End;
procedure TAtomFIFO.Empty;
begin
while (InterlockedExchange(FReadPtr, 0)<>0) and (InterlockedExchange(FWritePtr, 0)<>0) and (InterlockedExchange(FCount,0)<>0) do;
end;
function TAtomFIFO.GetSize: Integer;
begin
Result:=FHighBound+1;
end;
procedure TAtomFIFO.Push(Item:Pointer);
var
N:Integer;
begin
if Item=nil then Exit;
N:=InterlockedIncrement(FWritePtr) and FHighBound;
FData[N]:=Item;
InterlockedIncrement(FCount);
end;
Function TAtomFIFO.Pop:Pointer;
var
N:Integer;
begin
if InterlockedDecrement(FCount)<0 then
begin
InterlockedIncrement(FCount);
Result:=nil;
end
else
begin
N:=InterlockedIncrement(FReadPtr) and FHighBound;
while FData[N]=nil do Sleep(1);
Result:=FData[N];
FData[N]:=nil;
end;
end;
End.
另外一种方式
unit Iocp.AtomQueue;
interface
Uses
SysUtils,
SyncObjs;
Type
TAtomFIFO = Class
Protected
FWritePtr: Integer;
FReadPtr: Integer;
FCount:Integer;
FHighBound:Integer;
FisEmpty:Integer;
FData: array of Pointer;
function GetSize:Integer;
Public
procedure Push(Item: Pointer);
function Pop: Pointer;
Constructor Create(Size: Integer); Virtual;
Destructor Destroy; Override;
Procedure Empty;
property Size: Integer read GetSize;
property UsedCount:Integer read FCount;
End;
Implementation
//创建队列,大小必须是2的幂,需要开辟足够大的队列,防止队列溢出
Constructor TAtomFIFO.Create(Size: Integer);
var
i:NativeInt;
OK:Boolean;
Begin
Inherited Create;
OK:=(Size and (Size-1)=0);
if not OK then raise Exception.Create('FIFO长度必须大于等于256并为2的幂');
try
SetLength(FData, Size);
FHighBound:=Size-1;
except
Raise Exception.Create('FIFO申请内存失败');
end;
End;
Destructor TAtomFIFO.Destroy;
Begin
SetLength(FData, 0);
Inherited;
End;
procedure TAtomFIFO.Empty;
begin
while (TInterlocked.Exchange(FReadPtr, 0)<>0) and
(TInterlocked.Exchange(FWritePtr, 0)<>0) and
(TInterlocked.Exchange(FCount, 0)<>0) do;
end;
function TAtomFIFO.GetSize: Integer;
begin
Result:=FHighBound+1;
end;
procedure TAtomFIFO.Push(Item:Pointer);
var
N:Integer;
begin
if Item=nil then Exit;
N:=TInterlocked.Increment(FWritePtr) and FHighBound;
FData[N]:=Item;
TInterlocked.Increment(FCount);
end;
Function TAtomFIFO.Pop:Pointer;
var
N:Integer;
begin
if TInterlocked.Decrement(FCount)<0 then
begin
TInterlocked.Increment(FCount);
Result:=nil;
end
else
begin
N:=TInterlocked.Increment(FReadPtr) and FHighBound;
//假设线程A调用了Push,并且正好是第1个push,
//执行了N:=TInterlocked.Increment(FWritePtr) and FHighBound,
//还没执行FData[N]:=Item, 被切换到其他线程
//此时假设线程B调用了Push,并且正好是第2个push,并且执行完毕,这样出现FCount=1,第2个Item不为空,而第一个Item还是nil(线程A还没执行赋值)
//假设线程C执行Pop,由于Count>0(线程B的作用)所以可以执行到这里,但此时FData[N]=nil(线程A还没执行赋值),
//因此线程C要等待线程A完成FData[N]:=Item后,才能取走FData[N]
//出现这种情况的概率应该比较小,基本上不会浪费太多CPU
while FData[N]=nil do Sleep(1);
Result:=FData[N];
FData[N]:=nil;
end;
end;
End.
Delphi的FIFO实现
FIFO主要用于多个不同线程或进程之间数据交换时做缓冲区用,尤其适合实时数据通讯应用中的数据缓冲,接收线程(进程)将数据写入FIFO,处理线程(进程)从FIFO取出数据
本单元中:
TMemoryFIFO类适用于单进程内不同线程之间交换数据
TMapFileFIFO类适用于不同进程之间交换数据
Unit UtFIFO;
Interface
Uses
Windows,
SysUtils,
SyncObjs;
Type
PFIFOStruct= ^TFIFOStruct;
TFIFOStruct= Record
FSize: Integer;
FWritePtr: Integer;
FReadPtr: Integer;
FBuffer: TByteArray;
End;
TFIFOReadFunc= Function(Buf: Pointer; Count: Integer): Integer;
TFIFOReadFuncOfObject= Function(const Buf; Count: Integer): Integer Of Object;
TAbstractFIFO= Class
Protected
FSelfAccess: Boolean;
FDataStruct: PFIFOStruct; // 数据区指针
Procedure AllocateResource(Size: Integer); Virtual; Abstract;
Procedure FreeResources; Virtual; Abstract;
Procedure Lock; Virtual; Abstract;
Procedure UnLock; Virtual; Abstract;
Public
Function FIFOFreeSpace: Integer;
Function FIFOUsedSpace: Integer;
Function CheckFIFOFull: Boolean;
Function CheckFIFOEmpty: Boolean;
Function WriteData(const Buf: Pointer; Count: Integer): Integer; Virtual;
Function ReadData(Buf: Pointer; Count: Integer): Integer; Virtual;
Function ReadDataByFunc(Func: TFIFOReadFuncOfObject;
Count: Integer): Integer; Virtual;
Constructor Create(Size: Integer); Virtual;
Destructor Destroy; Override;
Procedure Empty;
Function Size: Integer;
End;
TMemoryFIFO= Class(TAbstractFIFO)
Protected
FLocker: TCriticalSection;
Procedure AllocateResource(Size: Integer); Override;
Procedure FreeResources; Override;
Procedure Lock; Override;
Procedure UnLock; Override;
Public
Constructor Create(Size: Integer); Override;
Destructor Destroy; Override;
End;
TFileMapFIFO= Class(TAbstractFIFO)
Private
FMaster:Boolean;
FMapHandle: THandle; // 内存映射文件句柄
FMutexHandle: THandle; // 互斥句柄
FMapName: String; // 内存映射对象
FPVHandle: THandle;
Protected
Procedure AllocateResource(Size: Integer); Override;
Procedure FreeResources; Override;
Procedure Lock; Override;
Procedure UnLock; Override;
Public
Constructor Create(Const MapName: String; Size: Integer;bMaster:Boolean); Overload;
Destructor Destroy; Override;
Function WriteData(const Buf: Pointer; Count: Integer): Integer; Override;
Function ReadData(Buf: Pointer; Count: Integer): Integer; Override;
property PVHandle:NativeUInt read FPVHandle;
End;
Implementation
Function Min(Const A, B: Integer): Integer; inline;
begin
if A>B then Result:=B else Result:=A
end;
Constructor TAbstractFIFO.Create(Size: Integer);
Begin
Inherited Create;
AllocateResource(Size);
If Not Assigned(FDataStruct) Then
Raise Exception.Create('FIFO申请内存失败');
End;
Destructor TAbstractFIFO.Destroy;
Begin
FreeResources;
Inherited;
End;
Function TAbstractFIFO.FIFOFreeSpace;
Begin
With FDataStruct^ Do
Begin
Lock;
If FWritePtr> FReadPtr Then
Result:= (FSize- FWritePtr)+ FReadPtr- 1
Else
If FWritePtr< FReadPtr Then
Result:= FReadPtr- FWritePtr- 1
Else
Result:= FSize;
UnLock;
End;
End;
Function TAbstractFIFO.FIFOUsedSpace;
Begin
With FDataStruct^ Do
Begin
Lock;
If FWritePtr> FReadPtr Then
Result:= FWritePtr- FReadPtr
Else
If FWritePtr< FReadPtr Then
Result:= (FSize- FReadPtr)+ FWritePtr
Else
Result:= 0;
UnLock;
End;
End;
Function TAbstractFIFO.CheckFIFOFull: Boolean;
Begin
With FDataStruct^ Do
Begin
Lock;
If (FWritePtr= FSize- 1)And (FReadPtr= 0) Then
Result:= True
Else
If (FWritePtr+ 1= FReadPtr) Then
Result:= True
Else
Result:= False;
UnLock;
End;
End;
Function TAbstractFIFO.CheckFIFOEmpty: Boolean;
Begin
With FDataStruct^ Do
Begin
Lock;
Result:= (FWritePtr= FReadPtr);
UnLock;
End;
End;
Function TAbstractFIFO.WriteData(const Buf: Pointer; Count: Integer): Integer;
Var
N: Integer;
Begin
Result:= 0;
If Count<= 0 Then
Exit;
With FDataStruct^ Do
Begin
Lock;
If FWritePtr< FReadPtr Then //如果没有满或已满
Begin
Result:= Min(Count, FReadPtr- FWritePtr- 1);
Move(Buf^, FBuffer[FWritePtr], Result);
FWritePtr:= (FWritePtr+ Result)Mod FSize;
End
Else
If FWritePtr = FReadPtr Then //Buffer 空
Begin
Result:= Min(Count, FSize- 1);
Move(Buf^, FBuffer[0], Result);
FWritePtr:= Result;
FReadPtr:= 0;
End
Else
Begin
Result:= Min(Count, FSize- FWritePtr);
Move(Buf^, FBuffer[FWritePtr], Result);
if Result=Count then FWritePtr:= (FWritePtr+ Result) Mod FSize
else
Begin
N:= Min(Count- Result, FReadPtr);
Move(PByteArray(Buf)^[Result], FBuffer[0], N);
FWritePtr:= N;
Result:= Result+ N;
End;
End;
UnLock;
End;
End;
Function TAbstractFIFO.ReadData(Buf: Pointer; Count: Integer): Integer;
Var
N: Integer;
Begin
Result:= 0;
If Count<= 0 Then
Exit;
With FDataStruct^ Do
Begin
Lock;
If FReadPtr< FWritePtr Then
Begin
Result:= Min(Count, FWritePtr- FReadPtr);
Move(FBuffer[FReadPtr], Buf^, Result);
FReadPtr:= (FReadPtr+ Result)Mod FSize;
End
Else if FReadPtr>FWritePtr Then
Begin
Result:= Min(Count, FSize- FReadPtr);
Move(FBuffer[FReadPtr], Buf^, Result);
if Result=Count then FReadPtr:=(FReadPtr+Result) mod FSize
else
Begin
N:= Min(Count- Result, FWritePtr);
Move(FBuffer[0], PByteArray(Buf)[Result], N);
FReadPtr:= N;
Result:= Result+ N;
End;
End;
UnLock;
End;
End;
Function TAbstractFIFO.ReadDataByFunc(Func: TFIFOReadFuncOfObject;
Count: Integer): Integer;
Var
N, M: Integer;
Begin
Result:= 0;
If Count<= 0 Then
Exit;
With FDataStruct^ Do
Begin
Lock;
Try
If FReadPtr< FWritePtr Then
Begin
Result:= Func(FBuffer[FReadPtr], Min(Count, FWritePtr- FReadPtr));
FReadPtr:= (FReadPtr+ Result)Mod FSize;
End
Else if FReadPtr>FWritePtr Then
Begin
Result:= Func(FBuffer[FReadPtr], Min(Count, FSize- FReadPtr));
if Result=Count then FReadPtr:=(FReadPtr+Result) mod FSize
else
Begin
N:= Func(FBuffer[0], Min(Count- Result, FWritePtr));
FReadPtr:= N;
Result:= Result+ N;
End;
End;
Finally
UnLock;
End;
End;
End;
Procedure TAbstractFIFO.Empty;
Begin
Lock;
With FDataStruct^ Do
Begin
FWritePtr:= 0;
FReadPtr:= 0;
End;
UnLock;
End;
Function TAbstractFIFO.Size: Integer;
Begin
Result:= FDataStruct^.FSize- 1;
End;
Constructor TMemoryFIFO.Create(Size: Integer);
Begin
Inherited Create(Size);
FLocker:= TCriticalSection.Create;
End;
Destructor TMemoryFIFO.Destroy;
Begin
FLocker.Free;
Inherited;
End;
Procedure TMemoryFIFO.AllocateResource(Size: Integer);
Begin
Inherited;
GetMem(FDataStruct, Size+ 3* Sizeof(Integer));
With FDataStruct^ Do
Begin
FSize:= Size;
FWritePtr:= 0;
FReadPtr:= 0;
End;
End;
Procedure TMemoryFIFO.FreeResources;
Begin
FreeMem(FDataStruct, FDataStruct^.FSize+ 3* Sizeof(Integer));
Inherited;
End;
Procedure TMemoryFIFO.Lock;
Begin
FLocker.Enter;
End;
Procedure TMemoryFIFO.UnLock;
Begin
FLocker.Leave;
End;
// 构造函数
Constructor TFileMapFIFO.Create(Const MapName: String; Size: Integer;bMaster:Boolean);
Begin
FMapName:= MapName;
FMaster:=bMaster;
Inherited Create(Size);
End;
Destructor TFileMapFIFO.Destroy;
Begin
CloseHandle(FPVHandle);
Inherited;
End;
Procedure TFileMapFIFO.AllocateResource(Size: Integer);
Begin
Inherited;
if FMaster then
begin
FMapHandle:= CreateFileMapping($FFFFFFFF, Nil, PAGE_READWRITE, 0,
Size+ 3* Sizeof(Integer), PChar(FMapName));
If (FMapHandle= INVALID_HANDLE_VALUE)Or (FMapHandle= 0) Then
Raise Exception.Create('创建文件映射对象失败!');
end
else
FMapHandle:=OpenFileMapping(FILE_MAP_ALL_ACCESS,False,PChar(FMapName));
FDataStruct:= MapViewOfFile(FMapHandle, FILE_MAP_ALL_ACCESS, 0, 0, 0);
// 创建互斥对象,在写文件映射空间时用到它,以保持数据同步
FMutexHandle:= Windows.CreateMutex(Nil, False, PChar(FMapName+ '.Mtx'));
FPVHandle := CreateEvent(nil,True,False,PChar(FMapName + '.PV'));
If (FMutexHandle= 0)or(FPVHandle = 0) Then
Raise Exception.Create('创建互斥对象失败');
// 判断是否已经建立文件映射了
If (FMapHandle <> 0)And (GetLastError = ERROR_ALREADY_EXISTS) Then
Begin
End
Else
Begin
FillChar(FDataStruct^, Size+ 3* Sizeof(Integer), 0);
FDataStruct^.FSize:= Size;
End
End;
Procedure TFileMapFIFO.FreeResources;
Begin
UnmapViewOfFile(FDataStruct);
CloseHandle(FMutexHandle);
CloseHandle(FMapHandle);
Inherited;
End;
Procedure TFileMapFIFO.Lock;
Begin
WaitForSingleObject(FMutexHandle, INFINITE); // =WAIT_OBJECT_0)
End;
Procedure TFileMapFIFO.UnLock;
Begin
ReleaseMutex(FMutexHandle);
End;
Function TFileMapFIFO.WriteData(const Buf: Pointer; Count: Integer): Integer;
Begin
Lock;
Result:= Inherited WriteData(Buf, Count);
SetEvent(FPVHandle);
UnLock;
End;
Function TFileMapFIFO.ReadData(Buf: Pointer; Count: Integer): Integer;
Begin
Lock;
Result:= Inherited ReadData(Buf, Count);
UnLock;
End;
End.