开发者

Can we use TDSProviderConnection to replace TLocalConnection for in-process DataSnap application?

开发者 https://www.devze.com 2022-12-08 18:00 出处:网络
I able to access server method by in-process DataSnap application. Click here for details. However, there is another aspect of in-process datasnap application. It is the IAppServer or TDataSetProvide

I able to access server method by in-process DataSnap application. Click here for details.

However, there is another aspect of in-process datasnap application. It is the IAppServer or TDataSetProvider.

Prior to Delphi 2009, I use TConnectionBroker with TLocalConnection for in-process datasnap access. The new Delphi 2009/2010 DataSnap allow us to use TDSProviderConnection to as RemoteServer. However, I can only make it works for TCP/HTTP connection. I can't use TDSProviderConnection for in-process datasnap application. It will prompt "invalid pointer operation".

This is how my code looks like:

var o: TDataModule1;
    Q: TSQLConnection;
    c: TEmployeeServerClient;
begin
  o := TDataModule1.Create(Self); 
  Q := TSQLConnection.Create(Self);
  try
    Q.DriverName := 'DSServer1';
    Q.LoginPrompt := False;
    Q.Open;

    DSProviderConnection1.SQLConnection := Q;
    DSProviderConnection1.ServerClassName := 'TEmployeeServer';
    DSProvi开发者_运维技巧derConnection1.Connected := True;

    ClientDataSet1.ProviderName := 'DataSetProvider1';
    ClientDataSet1.Open;
  finally
    o.Free;
    Q.Free;
  end;
end;

The TEmployeeServer is a TDSServerModule class descendant that consist of TDataSetProvider, TSQLDataSet and TSQLConnection that connect together.

After tracing the source code, I found the TSQLDataSet did open and traverse the dataset. The cause of the problem should be related to the following 2 methods that use TDBXNoOpRow

function TDSVoidConnectionHandler.CreateDbxRow: TDBXStreamerRow;
begin
  Result := TDBXNoOpRow.Create(DBXContext);
end;

function TDSServerCommand.CreateParameterRow: TDBXRow;
begin
  Result := TDBXNoOpRow.Create(FDbxContext);
end;

The TDBXNoOpRow instance will consumed by

procedure TDBXStreamValue.SetRowValue;
begin
  if FExtendedType then
  begin
    if FStreamStreamReader <> nil then
      FDbxRow.SetStream(Self, FStreamStreamReader)
    else if FByteStreamReader <> nil then
      FDbxRow.SetStream(Self, FByteStreamReader)
    else
      inherited SetRowValue;
  end else
    inherited SetRowValue;
end;

Since TDBXNoOpRow doesn't nothing, the data packet doesn't get transfer by above method. I suspect this is the cause of the problem using in-process machanism.

I am not sure if we able to throw away TLocalConnection and replaced by TDSProviderConnection for in-process DataSnap application? I have traced the DBX source code for days and can't even find a clue on this issue.


Classic DataSnap

Prior to Delphi 2009, we may use either TLocalConnection or TSocketConnection together with TConnectionBroker for in-process or out-of-process communication via IAppServer interface. There are even more DataSnap connection that supports IAppServer. Check Delphi helps for details.

New DataSnap from Delphi 2009

Previously, TSQLConnection was used in DataSnap server only. In new DataSnap, we may use TSQLConnection in DataSnap client. There is a new driver call DataSnap that allow us to connect to a DataSnap server either via TCP or HTTP protocol using REST data packet for multi-tier application. Furthermore, we may use connect to TDSSever (TDSServer.Name) via TSQLConnection.DriverName for in-process connection. This benefits us to write a scalable multi-tier DataSnap application to consume server methods. See here for more details.

In Delphi 2009/2010, a new DataSnap connection component – TDSProviderConnection was introduced. As the name implied, it supply providers from DataSnap server. This connection require a TSQLConnection instance to work with in client tier. Thus, we may use a single TSQLConnection in client tier either in-process or out-of-process. And that fulfill the design philosophy of scalable multi-tier DataSnap application.

There are lots of demo or CodeRage videos available in the web showing how to TDSProviderConnection in DataSnap client tier. However, most of the examples only showing out-of-process design. I never find one example illustrate the usage of TDSProviderConnection for in-process design while writing this topic. Hope there are more from other famous or well know Delphi fans.

At first, I thought it is easy to use TDSProviderConnection for in-process design. But I face problems while follow the rules. These problems should be related to bugs and in mature design of DataSnap framework. I will show at here how to deals with the problems.

Design a DataSnap module

First, we design a simple DataSnap module for this example. This is a TDSServerModule descendant instance with 2 components: a TDataSetProvider and a TClientDataSet instance. The reason using TDSServerModule is it will manage providers define in the module.

MySeverProvider.DFM

object ServerProvider: TServerProvider
  OldCreateOrder = False
  OnCreate = DSServerModuleCreate
  Height = 225
  Width = 474
  object DataSetProvider1: TDataSetProvider
    DataSet = ClientDataSet1
    Left = 88
    Top = 56
  end
  object ClientDataSet1: TClientDataSet
    Aggregates = <>
    Params = <>
    Left = 200
    Top = 56
  end
end

MyServerProvider.PAS

type
  TServerProvider = class(TDSServerModule)
    DataSetProvider1: TDataSetProvider;
    ClientDataSet1: TClientDataSet;
    procedure DSServerModuleCreate(Sender: TObject);
  end;

{$R *.dfm}

procedure TServerProvider.DSServerModuleCreate(Sender: TObject);
begin
  ClientDataSet1.LoadFromFile('..\orders.cds');
end;

Define a transport layer for the provider module

Since this is a in-process application, we don’t really need a physical transport layer for the provider module. What we need here is a TDSServer and a TDSServerClass instance that helps propagate the providers to ClientDataSet in later stage.

var C: TDSServer:
    D: TDSServerClass;
begin
  C := TDSServer.Create(nil);
  D := TDSServerClass.Create(nil);
  try
    C.Server := D;
    C.OnGetClass := OnGetClass;
    D.Start;

  finally
    D.Free;
    C.Free;
  end;
end;

procedure TForm1.OnGetClass(DSServerClass: TDSServerClass; var
    PersistentClass: TPersistentClass);
begin
  PersistentClass := TServerProvider;
end;

Use TDSProviderConnection to consume the in-process DataSnap service

We start hook up everything in DataSnap context to get it done:

var Q: TSQLConnection;
    D: TDSServer;
    C: TDSServerClass;
    P: TServerProvider;
    N: TDSProviderConnection;
begin
  P := TServerProvider.Create(nil);
  D := TDSServer.Create(nil);
  C := TDSServerClass.Create(nil);
  Q := TSQLConnection.Create(nil);
  N := TDSProviderConnection.Create(nil);
  try
    C.Server := D;
    C.OnGetClass := OnGetClass;

    D.Start;

    Q.DriverName := 'DSServer';
    Q.LoginPrompt := False;
    Q.Open;

    N.SQLConnection := Q;
    N.ServerClassName := 'TServerProvider';
    N.Connected := True;

    ClientDataSet1.RemoteServer := N;
    ClientDataSet1.ProviderName := 'DataSetProvider1';
    ClientDataSet1.Open;

    ShowMessage(IntToStr(ClientDataSet1.RecordCount));
  finally
    N.Free;
    Q.Free;
    C.Free;
    D.Free;
    P.Free;
  end;
end;

If you are using Delphi version 14.0.3513.24210 or prior than that, you will find it doesn’t work, a “Invalid pointer operation” exception raise after that.

I have found all the problems faced so far and the fixed are as follow.

Troubleshoot: Invalid pointer operation

There is a bug in DSUtil.StreamToDataPacket. I have file a report in QC#78666.

Here is a fix without changing the DBX source code:

unit DSUtil.QC78666;

interface

implementation

uses SysUtils, Variants, VarUtils, ActiveX, Classes, DBXCommonResStrs, DSUtil,
     CodeRedirect;

type
  THeader = class
    const
      Empty       = 1;
      Variant     = 2;
      DataPacket  = 3;
  end;

  PIntArray = ^TIntArray;
  TIntArray = array[0..0] of Integer;

  TVarFlag = (vfByRef, vfVariant);
  TVarFlags = set of TVarFlag;

  EInterpreterError = class(Exception);

  TVariantStreamer = class
  private
    class function ReadArray(VType: Integer; const Data: TStream): OleVariant;
  public
    class function ReadVariant(out Flags: TVarFlags; const Data: TStream): OleVariant;
  end;

const
  EasyArrayTypes = [varSmallInt, varInteger, varSingle, varDouble, varCurrency,
                    varDate, varBoolean, varShortInt, varByte, varWord, varLongWord];

  VariantSize: array[0..varLongWord] of Word  = (0, 0, SizeOf(SmallInt), SizeOf(Integer),
    SizeOf(Single), SizeOf(Double), SizeOf(Currency), SizeOf(TDateTime), 0, 0,
    SizeOf(Integer), SizeOf(WordBool), 0, 0, 0, 0, SizeOf(ShortInt), SizeOf(Byte),
    SizeOf(Word), SizeOf(LongWord));

class function TVariantStreamer.ReadArray(VType: Integer; const Data: TStream): OleVariant;
var
  Flags: TVarFlags;
  LoDim, HiDim, Indices, Bounds: PIntArray;
  DimCount, VSize, i: Integer;
  V: OleVariant;
  LSafeArray: PSafeArray;
  P: Pointer;
begin
  VarClear(Result);
  Data.Read(DimCount, SizeOf(DimCount));
  VSize := DimCount * SizeOf(Integer);
  GetMem(LoDim, VSize);
  try
    GetMem(HiDim, VSize);
    try
      Data.Read(LoDim^, VSize);
      Data.Read(HiDim^, VSize);
      GetMem(Bounds, VSize * 2);
      try
        for i := 0 to DimCount - 1 do
        begin
          Bounds[i * 2] := LoDim[i];
          Bounds[i * 2 + 1] := HiDim[i];
        end;
        Result := VarArrayCreate(Slice(Bounds^,DimCount * 2), VType and varTypeMask);
      finally
        FreeMem(Bounds);
      end;
      if VType and varTypeMask in EasyArrayTypes then
      begin
        Data.Read(VSize, SizeOf(VSize));
        P := VarArrayLock(Result);
        try
          Data.Read(P^, VSize);
        finally
          VarArrayUnlock(Result);
        end;
      end else
      begin
        LSafeArray := PSafeArray(TVarData(Result).VArray);
        GetMem(Indices, VSize);
        try
          FillChar(Indices^, VSize, 0);
          for I := 0 to DimCount - 1 do
            Indices[I] := LoDim[I];
          while True do
          begin
            V := ReadVariant(Flags, Data);
            if VType and varTypeMask = varVariant then
              SafeArrayCheck(SafeArrayPutElement(LSafeArray, Indices^, V))
            else
              SafeArrayCheck(SafeArrayPutElement(LSafeArray, Indices^, TVarData(V).VPointer^));
            Inc(Indices[DimCount - 1]);
            if Indices[DimCount - 1] > HiDim[DimCount - 1] then
              for i := DimCount - 1 downto 0 do
                if Indices[i] > HiDim[i] then
                begin
                  if i = 0 then Exit;
                  Inc(Indices[i - 1]);
                  Indices[i] := LoDim[i];
                end;
          end;
        finally
          FreeMem(Indices);
        end;
      end;
    finally
      FreeMem(HiDim);
    end;
  finally
    FreeMem(LoDim);
  end;
end;

class function TVariantStreamer.ReadVariant(out Flags: TVarFlags; const Data: TStream): OleVariant;
var
  I, VType: Integer;
  W: WideString;
  TmpFlags: TVarFlags;
begin
  VarClear(Result);
  Flags := [];
  Data.Read(VType, SizeOf(VType));
  if VType and varByRef = varByRef then
    Include(Flags, vfByRef);
  if VType = varByRef then
  begin
    Include(Flags, vfVariant);
    Result := ReadVariant(TmpFlags, Data);
    Exit;
  end;
  if vfByRef in Flags then
    VType := VType xor varByRef;
  if (VType and varArray) = varArray then
    Result := ReadArray(VType, Data) else
  case VType and varTypeMask of
    varEmpty: VarClear(Result);
    varNull: Result := NULL;
    varOleStr:
    begin
      Data.Read(I, SizeOf(Integer));
      SetLength(W, I);
      Data.Read(W[1], I * 2);
      Result := W;
    end;
    varDispatch, varUnknown:
      raise EInterpreterError.CreateResFmt(@SBadVariantType,[IntToHex(VType,4)]);
  else
    TVarData(Result).VType := VType;
    Data.Read(TVarData(Result).VPointer, VariantSize[VType and varTypeMask]);
  end;
end;

procedure StreamToDataPacket(const Stream: TStream; out VarBytes: OleVariant);
var
  P: Pointer;
  ByteCount: Integer;
  Size: Int64;
begin
  Stream.Read(Size, 8);
  ByteCount := Integer(Size);
  if ByteCount > 0 then
  begin
    VarBytes := VarArrayCreate([0, ByteCount-1], varByte);
    P := VarArrayLock(VarBytes);
    try
//      Stream.Position := 0;   // QC#78666 "Mismatched in datapacket" with DSUtil.StreamToDataPacket
      Stream.Read(P^, ByteCount);
      Stream.Position := 0;
    finally
      VarArrayUnlock(VarBytes);
    end;
  end
  else
    VarBytes := Null;
end;

procedure StreamToVariantPatch(const Stream: TStream; out VariantValue: OleVariant);
var
  Flags: TVarFlags;
  Header: Byte;
begin
  if Assigned(Stream) then
  begin
    Stream.Position := 0;
    Stream.Read(Header, 1);
    if Header = THeader.Variant then
      VariantValue := TVariantStreamer.ReadVariant(Flags, Stream)
    else if Header = THeader.DataPacket then
      StreamToDataPacket(Stream, VariantValue)
    else
      Assert(false);
  end;
end;

var QC78666: TCodeRedirect;

initialization
  QC78666 := TCodeRedirect.Create(@StreamToVariant, @StreamToVariantPatch);
finalization
  QC78666.Free;
end.

Troubleshoot: I still encounter “Invalid Pointer Operation” after apply DSUtil.StreamToDataPacket patch

I have filed this problem in QC#78752. An in-process DataSnap create an instance of TDSServerCommand. A method of TDSServerCommand create TDBXNoOpRow instance:

function TDSServerCommand.CreateParameterRow: TDBXRow;
begin
  Result := TDBXNoOpRow.Create(FDbxContext);
end;

Most of the methods in TDBXNoOpRow is not implemented. There are 2 methods in class TDBXNoOpRow, GetStream and SetStream are used in subsequence operations. This is the reason that cause the exception.

After fix TDBXNoOpRow problem, the data packet will transport to ClientDataSet successfully.

The fix is as follow:

unit DBXCommonServer.QC78752;

interface

uses SysUtils, Classes, DBXCommon, DSCommonServer, DBXCommonTable;

type
  TDSServerCommand_Patch = class(TDSServerCommand)
  protected
    function CreateParameterRowPatch: TDBXRow;
  end;

  TDBXNoOpRowPatch = class(TDBXNoOpRow)
  private
    function GetBytesFromStreamReader(const R: TDBXStreamReader; out Buf: TBytes): Integer;
  protected
    procedure GetStream(DbxValue: TDBXStreamValue; var Stream: TStream; var IsNull:
        LongBool); override;
    procedure SetStream(DbxValue: TDBXStreamValue; StreamReader: TDBXStreamReader);
        override;
    function UseExtendedTypes: Boolean; override;
  end;

  TDBXStreamValueAccess = class(TDBXByteArrayValue)
  private
    FStreamStreamReader: TDBXLookAheadStreamReader;
  end;

implementation

uses CodeRedirect;

function TDSServerCommand_Patch.CreateParameterRowPatch: TDBXRow;
begin
  Result := TDBXNoOpRowPatch.Create(FDbxContext);
end;

procedure TDBXNoOpRowPatch.GetStream(DbxValue: TDBXStreamValue; var Stream: TStream;
    var IsNull: LongBool);
var iSize: integer;
    B: TBytes;
begin
  iSize := GetBytesFromStreamReader(TDBXStreamValueAccess(DbxValue).FStreamStreamReader, B);
  IsNull := iSize = 0;
  if not IsNull then begin
    Stream := TMemoryStream.Create;
    Stream.Write(B[0], iSize);
  end;
end;

procedure TDBXNoOpRowPatch.SetStream(DbxValue: TDBXStreamValue; StreamReader:
    TDBXStreamReader);
var B: TBytes;
    iSize: integer;
begin
  iSize := GetBytesFromStreamReader(StreamReader, B);
  Dbxvalue.SetDynamicBytes(0, B, 0, iSize);
end;

function TDBXNoOpRowPatch.GetBytesFromStreamReader(const R: TDBXStreamReader; out Buf: TBytes):
    Integer;
const BufSize = 50 * 1024;
var iPos: integer;
    iRead: integer;
begin
  Result := 0;
  while not R.Eos do begin
    SetLength(Buf, Result + BufSize);
    iPos := Result;
    iRead := R.Read(Buf, iPos, BufSize);
    Inc(Result, iRead);
  end;
  SetLength(Buf, Result);
end;

function TDBXNoOpRowPatch.UseExtendedTypes: Boolean;
begin
  Result := True;
end;

var QC78752: TCodeRedirect;

initialization
  QC78752 := TCodeRedirect.Create(@TDSServerCommand_Patch.CreateParameterRow, @TDSServerCommand_Patch.CreateParameterRowPatch);
finalization
  QC78752.Free;
end.

Troubleshoot: Both patches applied and work for the example but I still encounter “Invalid Pointer Operation”

This problem also filed in QC#78752. The problem is due to the following 2 methods:

  1. procedure TDBXStreamValue.SetValue
  2. function TDBXLookAheadStreamReader.ConvertToMemoryStream: TStream;

TDBXLookAheadStreamReader.ConvertToMemoryStream return a managed FStream object to TDBXStreamValue.SetValue. This stream object become another managed object of TDBXStreamValue. It turns out that a Stream object managed by two objects and the exception raised when these 2 objects attempt to free the Stream object:

procedure TDBXStreamValue.SetValue(const Value: TDBXValue);
begin
  if Value.IsNull then
    SetNull
  else
  begin
    SetStream(Value.GetStream(False), True);
  end;
end;
function TDBXLookAheadStreamReader.ConvertToMemoryStream: TStream;
...
begin
  if FStream = nil then
    Result := nil
  else
  begin
    Count := Size;
    if not (FStream is TMemoryStream) then
    begin
      ...
      StreamTemp := FStream;
      FStream := Stream;
      FreeAndNil(StreamTemp);
    end;
    FStream.Seek(0, soFromBeginning);
    FHasLookAheadByte := false;
    Result := FStream;
  end;
end;

The fix is as follow:

unit DBXCommon.QC78752;

interface

implementation

uses SysUtils, Classes, DBXCommon, CodeRedirect;

type
  TDBXLookAheadStreamReaderAccess = class(TDBXStreamReader)
  private
    FStream: TStream;
    FEOS:               Boolean;
    FHasLookAheadByte:  Boolean;
    FLookAheadByte:     Byte;
  end;

  TDBXLookAheadStreamReaderHelper = class helper for TDBXLookAheadStreamReader
  private
    function Accessor: TDBXLookAheadStreamReaderAccess;
  public
    function ConvertToMemoryStreamPatch: TStream;
  end;

function TDBXLookAheadStreamReaderHelper.Accessor:
    TDBXLookAheadStreamReaderAccess;
begin
  Result := TDBXLookAheadStreamReaderAccess(Self);
end;

function TDBXLookAheadStreamReaderHelper.ConvertToMemoryStreamPatch: TStream;
var
  Stream: TMemoryStream;
  StreamTemp: TStream;
  Count: Integer;
  Buffer: TBytes;
  ReadBytes: Integer;
begin
  if Accessor.FStream = nil then
    Result := nil
  else
  begin
    Count := Size;
    if not (Accessor.FStream is TMemoryStream) then
    begin
      Stream := TMemoryStream.Create;
      if Count >= 0 then
        Stream.SetSize(Count);
      if Accessor.FHasLookAheadByte then
        Stream.Write(Accessor.FLookAheadByte, 1);
      SetLength(Buffer, 256);
      while true do
      begin
        ReadBytes := Accessor.FStream.Read(Buffer, Length(Buffer));
        if ReadBytes > 0 then
          Stream.Write(Buffer, ReadBytes)
        else
          Break;
      end;
      StreamTemp := Accessor.FStream;
      Accessor.FStream := Stream;
      FreeAndNil(StreamTemp);
      Result := Accessor.FStream;
    end else begin
      Stream := TMemoryStream.Create;
      Accessor.FStream.Seek(0, soFromBeginning);
      Stream.CopyFrom(Accessor.FStream, Accessor.FStream.Size);
    end;
    Stream.Seek(0, soFromBeginning);
    Accessor.FHasLookAheadByte := false;

    Result := Stream;
//    Stream := TMemoryStream.Create;
//    Stream.LoadFromStream(FStream);
//    FStream.Seek(0, soFromBeginning);
//    Result := Stream;
  end;
end;

var QC78752: TCodeRedirect;

initialization
  QC78752 := TCodeRedirect.Create(@TDBXLookAheadStreamReader.ConvertToMemoryStream, @TDBXLookAheadStreamReader.ConvertToMemoryStreamPatch);
finalization
  QC78752.Free;
end.

Troubleshoot: I encounter memory leaks after close the application

There is a memory leaks in TDSServerConnection for in-process connection. I have filed a report in QC#78696.

Here is the fix:

unit DSServer.QC78696;

interface

implementation

uses SysUtils,
     DBXCommon, DSServer, DSCommonServer, DBXMessageHandlerCommon, DBXSqlScanner,
     DBXTransport,
     CodeRedirect;

type
  TDSServerConnectionHandlerAccess = class(TDBXConnectionHandler)
    FConProperties: TDBXProperties;
    FConHandle: Integer;
    FServer: TDSCustomServer;
    FDatabaseConnectionHandler: TObject;
    FHasServerConnection: Boolean;
    FInstanceProvider: TDSHashtableInstanceProvider;
    FCommandHandlers: TDBXCommandHandlerArray;
    FLastCommandHandler: Integer;
    FNextHandler: TDBXConnectionHandler;
    FErrorMessage: TDBXErrorMessage;
    FScanner: TDBXSqlScanner;
    FDbxConnection: TDBXConnection;
    FTransport: TDSServerTransport;
    FChannel: TDbxChannel;
    FCreateInstanceEventObject: TDSCreateInstanceEventObject;
    FDestroyInstanceEventObject: TDSDestroyInstanceEventObject;
    FPrepareEventObject: TDSPrepareEventObject;
    FConnectEventObject: TDSConnectEventObject;
    FErrorEventObject: TDSErrorEventObject;
    FServerCon: TDSServerConnection;
  end;

  TDSServerConnectionPatch = class(TDSServerConnection)
  public
    destructor Destroy; override;
  end;

  TDSServerDriverPatch = class(TDSServerDriver)
  protected
    function CreateConnectionPatch(ConnectionBuilder: TDBXConnectionBuilder): TDBXConnection;
  end;

destructor TDSServerConnectionPatch.Destroy;
begin
  inherited Destroy;
  TDSServerConnectionHandlerAccess(ServerConnectionHandler).FServerCon := nil;
  ServerConnectionHandler.Free;
end;

function TDSServerDriverPatch.CreateConnectionPatch(
  ConnectionBuilder: TDBXConnectionBuilder): TDBXConnection;
begin
  Result := TDSServerConnectionPatch.Create(ConnectionBuilder);
end;

var QC78696: TCodeRedirect;

initialization
  QC78696 := TCodeRedirect.Create(@TDSServerDriverPatch.CreateConnection, @TDSServerDriverPatch.CreateConnectionPatch);
finalization
  QC78696.Free;
end.
0

精彩评论

暂无评论...
验证码 换一张
取 消