Skip to content

Commit a56d9e8

Browse files
Merge pull request #1418 from kaleido-io/filters
Enable contract listeners with multiple filters
2 parents 07cc54a + fc18b63 commit a56d9e8

31 files changed

+5169
-639
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
BEGIN;
2+
ALTER TABLE contractlisteners DROP COLUMN filters;
3+
-- no down for the VARCHAR change
4+
COMMIT;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
BEGIN;
2+
ALTER TABLE contractlisteners ADD COLUMN filters TEXT;
3+
-- changing the length of varchar does not affect the index
4+
ALTER TABLE contractlisteners ALTER COLUMN signature TYPE VARCHAR;
5+
COMMIT;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE contractlisteners DROP COLUMN filters;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE contractlisteners ADD COLUMN filters TEXT;
2+
-- in SQLITE VARCHAR is equivalent to TEXT so no migration for signature length

doc-site/docs/reference/types/contractlistener.md

+15-4
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,17 @@ title: ContractListener
4747
| Field Name | Description | Type |
4848
|------------|-------------|------|
4949
| `id` | The UUID of the smart contract listener | [`UUID`](simpletypes.md#uuid) |
50-
| `interface` | A reference to an existing FFI, containing pre-registered type information for the event | [`FFIReference`](#ffireference) |
50+
| `interface` | Deprecated: Please use 'interface' in the array of 'filters' instead | [`FFIReference`](#ffireference) |
5151
| `namespace` | The namespace of the listener, which defines the namespace of all blockchain events detected by this listener | `string` |
5252
| `name` | A descriptive name for the listener | `string` |
5353
| `backendId` | An ID assigned by the blockchain connector to this listener | `string` |
54-
| `location` | A blockchain specific contract identifier. For example an Ethereum contract address, or a Fabric chaincode name and channel | [`JSONAny`](simpletypes.md#jsonany) |
54+
| `location` | Deprecated: Please use 'location' in the array of 'filters' instead | [`JSONAny`](simpletypes.md#jsonany) |
5555
| `created` | The creation time of the listener | [`FFTime`](simpletypes.md#fftime) |
56-
| `event` | The definition of the event, either provided in-line when creating the listener, or extracted from the referenced FFI | [`FFISerializedEvent`](#ffiserializedevent) |
57-
| `signature` | The stringified signature of the event, as computed by the blockchain plugin | `string` |
56+
| `event` | Deprecated: Please use 'event' in the array of 'filters' instead | [`FFISerializedEvent`](#ffiserializedevent) |
57+
| `signature` | A concatenation of all the stringified signature of the event and location, as computed by the blockchain plugin | `string` |
5858
| `topic` | A topic to set on the FireFly event that is emitted each time a blockchain event is detected from the blockchain. Setting this topic on a number of listeners allows applications to easily subscribe to all events they need | `string` |
5959
| `options` | Options that control how the listener subscribes to events from the underlying blockchain | [`ContractListenerOptions`](#contractlisteneroptions) |
60+
| `filters` | A list of filters for the contract listener. Each filter is made up of an Event and an optional Location. Events matching these filters will always be emitted in the order determined by the blockchain. | [`ListenerFilter[]`](#listenerfilter) |
6061

6162
## FFIReference
6263

@@ -92,3 +93,13 @@ title: ContractListener
9293
| `firstEvent` | A blockchain specific string, such as a block number, to start listening from. The special strings 'oldest' and 'newest' are supported by all blockchain connectors. Default is 'newest' | `string` |
9394

9495

96+
## ListenerFilter
97+
98+
| Field Name | Description | Type |
99+
|------------|-------------|------|
100+
| `event` | The definition of the event, either provided in-line when creating the listener, or extracted from the referenced FFI | [`FFISerializedEvent`](#ffiserializedevent) |
101+
| `location` | A blockchain specific contract identifier. For example an Ethereum contract address, or a Fabric chaincode name and channel | [`JSONAny`](simpletypes.md#jsonany) |
102+
| `interface` | A reference to an existing FFI, containing pre-registered type information for the event | [`FFIReference`](#ffireference) |
103+
| `signature` | The stringified signature of the event and location, as computed by the blockchain plugin | `string` |
104+
105+

doc-site/docs/swagger/swagger.yaml

+1,598-144
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Copyright © 2024 Kaleido, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package apiserver
18+
19+
import (
20+
"net/http"
21+
22+
"github.com/hyperledger/firefly-common/pkg/ffapi"
23+
"github.com/hyperledger/firefly/internal/coremsgs"
24+
"github.com/hyperledger/firefly/internal/orchestrator"
25+
"github.com/hyperledger/firefly/pkg/core"
26+
)
27+
28+
/*
29+
*
30+
31+
This API provides the ability to retrieve the signature for the filters of a contract listener
32+
33+
*
34+
*/
35+
var postContractListenerSignature = &ffapi.Route{
36+
Name: "postContractListenerSignature",
37+
Path: "contracts/listeners/signature",
38+
Method: http.MethodPost,
39+
PathParams: nil,
40+
QueryParams: nil,
41+
Description: coremsgs.APIEndpointsPostContractListenerHash,
42+
JSONInputValue: func() interface{} { return &core.ContractListenerInput{} },
43+
JSONOutputValue: func() interface{} { return &core.ContractListenerSignatureOutput{} },
44+
JSONOutputCodes: []int{http.StatusOK},
45+
Extensions: &coreExtensions{
46+
EnabledIf: func(or orchestrator.Orchestrator) bool {
47+
return or.Contracts() != nil
48+
},
49+
CoreJSONHandler: func(r *ffapi.APIRequest, cr *coreRequest) (output interface{}, err error) {
50+
return cr.or.Contracts().ConstructContractListenerSignature(cr.ctx, r.Input.(*core.ContractListenerInput))
51+
},
52+
},
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright © 2022 Kaleido, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package apiserver
18+
19+
import (
20+
"bytes"
21+
"encoding/json"
22+
"net/http/httptest"
23+
"testing"
24+
25+
"github.com/hyperledger/firefly/mocks/contractmocks"
26+
"github.com/hyperledger/firefly/pkg/core"
27+
"github.com/stretchr/testify/assert"
28+
"github.com/stretchr/testify/mock"
29+
)
30+
31+
func TestNewContractListenerSignature(t *testing.T) {
32+
o, r := newTestAPIServer()
33+
o.On("Authorize", mock.Anything, mock.Anything).Return(nil)
34+
mcm := &contractmocks.Manager{}
35+
o.On("Contracts").Return(mcm)
36+
input := core.ContractListenerInput{}
37+
var buf bytes.Buffer
38+
json.NewEncoder(&buf).Encode(&input)
39+
req := httptest.NewRequest("POST", "/api/v1/namespaces/mynamespace/contracts/listeners/signature", &buf)
40+
req.Header.Set("Content-Type", "application/json; charset=utf-8")
41+
res := httptest.NewRecorder()
42+
43+
mcm.On("ConstructContractListenerSignature", mock.Anything, mock.AnythingOfType("*core.ContractListenerInput")).
44+
Return(&core.ContractListenerSignatureOutput{}, nil, nil)
45+
r.ServeHTTP(res, req)
46+
47+
assert.Equal(t, 200, res.Result().StatusCode)
48+
}

internal/apiserver/routes.go

+1
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ var routes = append(
134134
postContractAPIPublish,
135135
postContractAPIQuery,
136136
postContractAPIListeners,
137+
postContractListenerSignature,
137138
postContractInterfaceGenerate,
138139
postContractInterfacePublish,
139140
postContractDeploy,

internal/blockchain/ethereum/ethereum.go

+104-9
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,26 @@ func (e *Ethereum) QueryContract(ctx context.Context, signingKey string, locatio
843843
return output, nil // note UNLIKE fabric this is just `output`, not `output.Result` - but either way the top level of what we return to the end user, is whatever the Connector sent us
844844
}
845845

846+
func (e *Ethereum) CheckOverlappingLocations(ctx context.Context, left *fftypes.JSONAny, right *fftypes.JSONAny) (bool, error) {
847+
if left == nil || right == nil {
848+
// No location on either side so overlapping
849+
return true, nil
850+
}
851+
852+
parsedLeft, err := e.parseContractLocation(ctx, left)
853+
if err != nil {
854+
return false, err
855+
}
856+
857+
parsedRight, err := e.parseContractLocation(ctx, right)
858+
if err != nil {
859+
return false, err
860+
}
861+
862+
// For Ethereum just compared addresses
863+
return strings.EqualFold(parsedLeft.Address, parsedRight.Address), nil
864+
}
865+
846866
func (e *Ethereum) NormalizeContractLocation(ctx context.Context, ntype blockchain.NormalizeType, location *fftypes.JSONAny) (result *fftypes.JSONAny, err error) {
847867
parsed, err := e.parseContractLocation(ctx, location)
848868
if err != nil {
@@ -875,25 +895,56 @@ func (e *Ethereum) encodeContractLocation(ctx context.Context, location *Locatio
875895
}
876896

877897
func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener, lastProtocolID string) (err error) {
878-
var location *Location
879898
namespace := listener.Namespace
880-
if listener.Location != nil {
881-
location, err = e.parseContractLocation(ctx, listener.Location)
899+
filters := make([]*filter, 0)
900+
901+
if len(listener.Filters) == 0 {
902+
return i18n.NewError(ctx, coremsgs.MsgFiltersEmpty, listener.Name)
903+
}
904+
905+
// For ethconnect we need to use one event and one location as it does not support filters
906+
// Note: the first filter event gets copied to the root of the listener for backwards
907+
// compatibility so available here
908+
// it will be ignored by evmconnect
909+
var firstEventABI *abi.Entry
910+
firstEventABI, err = ffi2abi.ConvertFFIEventDefinitionToABI(ctx, &listener.Filters[0].Event.FFIEventDefinition)
911+
if err != nil {
912+
return i18n.WrapError(ctx, err, coremsgs.MsgContractParamInvalid)
913+
}
914+
915+
// First filter location is copied over to the root
916+
var location *Location
917+
if listener.Filters[0].Location != nil {
918+
location, err = e.parseContractLocation(ctx, listener.Filters[0].Location)
882919
if err != nil {
883920
return err
884921
}
885922
}
886-
abi, err := ffi2abi.ConvertFFIEventDefinitionToABI(ctx, &listener.Event.FFIEventDefinition)
887-
if err != nil {
888-
return i18n.WrapError(ctx, err, coremsgs.MsgContractParamInvalid)
923+
924+
for _, f := range listener.Filters {
925+
abi, err := ffi2abi.ConvertFFIEventDefinitionToABI(ctx, &f.Event.FFIEventDefinition)
926+
if err != nil {
927+
return i18n.WrapError(ctx, err, coremsgs.MsgContractParamInvalid)
928+
}
929+
evmFilter := &filter{
930+
Event: abi,
931+
}
932+
if f.Location != nil {
933+
location, err := e.parseContractLocation(ctx, f.Location)
934+
if err != nil {
935+
return err
936+
}
937+
evmFilter.Address = location.Address
938+
}
939+
filters = append(filters, evmFilter)
889940
}
890941

891942
subName := fmt.Sprintf("ff-sub-%s-%s", listener.Namespace, listener.ID)
892943
firstEvent := string(core.SubOptsFirstEventNewest)
893944
if listener.Options != nil {
894945
firstEvent = listener.Options.FirstEvent
895946
}
896-
result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi, lastProtocolID)
947+
result, err := e.streams.createSubscription(ctx, e.streamID[namespace], subName, firstEvent, location, firstEventABI, filters, lastProtocolID)
897948
if err != nil {
898949
return err
899950
}
@@ -934,12 +985,56 @@ func (e *Ethereum) GetFFIParamValidator(ctx context.Context) (fftypes.FFIParamVa
934985
return &ffi2abi.ParamValidator{}, nil
935986
}
936987

937-
func (e *Ethereum) GenerateEventSignature(ctx context.Context, event *fftypes.FFIEventDefinition) string {
988+
func (e *Ethereum) GenerateEventSignature(ctx context.Context, event *fftypes.FFIEventDefinition) (string, error) {
938989
abi, err := ffi2abi.ConvertFFIEventDefinitionToABI(ctx, event)
939990
if err != nil {
991+
return "", err
992+
}
993+
signature := ffi2abi.ABIMethodToSignature(abi)
994+
indexedSignature := ABIMethodToIndexedSignature(abi)
995+
if indexedSignature != "" {
996+
signature = fmt.Sprintf("%s %s", signature, indexedSignature)
997+
}
998+
return signature, nil
999+
}
1000+
1001+
func (e *Ethereum) GenerateEventSignatureWithLocation(ctx context.Context, event *fftypes.FFIEventDefinition, location *fftypes.JSONAny) (string, error) {
1002+
eventSignature, err := e.GenerateEventSignature(ctx, event)
1003+
if err != nil {
1004+
// new error here needed
1005+
return "", err
1006+
}
1007+
1008+
// No location set
1009+
if location == nil {
1010+
return fmt.Sprintf("*:%s", eventSignature), nil
1011+
}
1012+
1013+
parsed, err := e.parseContractLocation(ctx, location)
1014+
if err != nil {
1015+
return "", err
1016+
}
1017+
1018+
return fmt.Sprintf("%s:%s", parsed.Address, eventSignature), nil
1019+
}
1020+
1021+
func ABIMethodToIndexedSignature(abi *abi.Entry) string {
1022+
if len(abi.Inputs) == 0 {
9401023
return ""
9411024
}
942-
return ffi2abi.ABIMethodToSignature(abi)
1025+
positions := []string{}
1026+
for i, param := range abi.Inputs {
1027+
if param.Indexed {
1028+
positions = append(positions, fmt.Sprint(i))
1029+
}
1030+
}
1031+
1032+
// No indexed fields
1033+
if len(positions) == 0 {
1034+
return ""
1035+
}
1036+
1037+
return "[i=" + strings.Join(positions, ",") + "]"
9431038
}
9441039

9451040
func (e *Ethereum) GenerateErrorSignature(ctx context.Context, errorDef *fftypes.FFIErrorDefinition) string {

0 commit comments

Comments
 (0)