Skip to content

Commit c2e115d

Browse files
author
David Fallah
committedMar 6, 2018
Implement support for Rx extensions (#1)
This commit also fixes minor documentation issues and involves numerous code style changes.
1 parent fd288c2 commit c2e115d

9 files changed

+409
-128
lines changed
 

‎AsyncBus.Tests/BusSpec.cs

+111-92
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
using Xunit;
2-
using Shouldly;
31
using System;
4-
using System.Threading.Tasks;
52
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Shouldly;
5+
using Xunit;
66

77
namespace AsyncBus.Tests
88
{
@@ -15,14 +15,6 @@ public BusSpec()
1515
_bus = BusSetup.CreateBus();
1616
}
1717

18-
[Fact]
19-
internal void Bus_Should_Allow_Registering_Subscriber_Returning_Void()
20-
{
21-
void Callback(string value) { }
22-
23-
Should.NotThrow(() => _bus.SubscribeSync<string>(Callback));
24-
}
25-
2618
[Fact]
2719
internal void Bus_Should_Allow_Registering_Subscriber_Returning_Task()
2820
{
@@ -32,81 +24,49 @@ internal void Bus_Should_Allow_Registering_Subscriber_Returning_Task()
3224
}
3325

3426
[Fact]
35-
internal void Bus_Should_Allow_Registering_Subscriber_Taking_Cancellation_Token()
27+
internal void Bus_Should_Allow_Registering_Subscriber_Returning_Void()
3628
{
37-
Task Callback(string value, CancellationToken cancellationToken) => Task.CompletedTask;
29+
void Callback(string value)
30+
{
31+
}
3832

39-
Should.NotThrow(() => _bus.Subscribe<string>(Callback));
33+
Should.NotThrow(() => _bus.SubscribeSync<string>(Callback));
4034
}
4135

4236
[Fact]
43-
internal async Task Subscriber_Should_Not_Be_Called_After_Corresponding_Subscription_Token_Disposed()
37+
internal void Bus_Should_Allow_Registering_Subscriber_Taking_Cancellation_Token()
4438
{
45-
// GIVEN we register a callback to update a local variable.
46-
int? receivedValue = null;
47-
void Callback(int value) => receivedValue = value;
48-
var subscriptionToken = _bus.SubscribeSync<int>(Callback);
49-
50-
// WHEN we publish a value.
51-
await _bus.Publish(3);
52-
53-
// THEN the local variable should be this value.
54-
receivedValue.ShouldBe(3);
55-
receivedValue = null;
56-
57-
// WHEN we dispose the subscription token.
58-
subscriptionToken.Dispose();
59-
60-
// AND we publish another value.
61-
await _bus.Publish(5);
39+
Task Callback(string value, CancellationToken cancellationToken) => Task.CompletedTask;
6240

63-
// THEN the variable should not have been updated.
64-
receivedValue.ShouldBeNull();
41+
Should.NotThrow(() => _bus.Subscribe<string>(Callback));
6542
}
6643

6744
[Fact]
68-
internal async Task Subscription_Should_Be_Covariant()
45+
internal async Task Cancellation_Tokens_Should_Be_Passed_To_Subscribers()
6946
{
70-
// GIVEN we register a callback that handles Parent objects.
71-
Parent receivedParent = null;
72-
void Callback(Parent parent) => receivedParent = parent;
47+
// GIVEN a subscriber that accepts a cancellation token is registered to the bus.
48+
var cancellationRequested = false;
49+
var tcs = new TaskCompletionSource<object>();
7350

74-
using (_bus.SubscribeSync<Parent>(Callback))
51+
async Task Callback(int _, CancellationToken cancellationToken)
7552
{
76-
// WHEN we publish a child object.
77-
await _bus.Publish(new Child { Property = 5 });
78-
79-
// THEN the subscriber should have received the published child.
80-
receivedParent.ShouldNotBeNull();
81-
receivedParent.Property.ShouldBe(5);
53+
await tcs.Task;
54+
cancellationRequested = cancellationToken.IsCancellationRequested;
8255
}
83-
}
84-
85-
[Fact]
86-
internal async Task Subscribers_Should_Be_Notified_In_Order_Of_Registration()
87-
{
88-
// GIVEN two subscribers register to the bus.
89-
var tcs = new TaskCompletionSource<object>();
90-
var secondCallbackCalled = false;
91-
92-
Task CallbackA(int _) => tcs.Task;
93-
void CallbackB(int _) => secondCallbackCalled = true;
9456

95-
using (_bus.Subscribe<int>(CallbackA))
96-
using (_bus.SubscribeSync<int>(CallbackB))
57+
using (_bus.Subscribe<int>(Callback))
9758
{
98-
// WHEN we publish a message on the bus.
99-
var publicationTask = _bus.Publish(3);
59+
// WHEN we publish a message with a cancellation token.
60+
var cts = new CancellationTokenSource();
61+
var publicationTask = _bus.Publish(3, cts.Token);
10062

101-
// THEN the second callback should not have been called.
102-
secondCallbackCalled.ShouldBeFalse();
63+
// AND we signal cancellation.
64+
cts.Cancel();
10365

104-
// WHEN we signal completion of the first callback.
66+
// THEN the subscriber should be notified of that cancellation.
10567
tcs.SetResult(null);
106-
107-
// THEN the second callback should have been called.
10868
await publicationTask;
109-
secondCallbackCalled.ShouldBeTrue();
69+
cancellationRequested.ShouldBeTrue();
11070
}
11171
}
11272

@@ -123,7 +83,7 @@ async Task CallbackA(int _)
12383
firstSubscriberNotifiedTcs.SetResult(null);
12484
await firstSubscriberBlockTcs.Task;
12585
}
126-
86+
12787
void CallbackB(int _) => secondSubscriberNotified = true;
12888

12989
using (_bus.Subscribe<int>(CallbackA))
@@ -160,55 +120,114 @@ async Task CallbackA(int _)
160120
}
161121

162122
[Fact]
163-
internal async Task Cancellation_Tokens_Should_Be_Passed_To_Subscribers()
123+
internal async Task Exceptions_Should_Propagate_From_Subscribers_To_Publication_Task()
164124
{
165-
// GIVEN a subscriber that accepts a cancellation token is registered to the bus.
166-
var cancellationRequested = false;
167-
var tcs = new TaskCompletionSource<object>();
168-
async Task Callback(int _, CancellationToken cancellationToken)
125+
// GIVEN a subscriber that throws an exception when notified.
126+
void Callback(int _) => throw new ArgumentException("All numbers are terrible.");
127+
128+
using (_bus.SubscribeSync<int>(Callback))
169129
{
170-
await tcs.Task;
171-
cancellationRequested = cancellationToken.IsCancellationRequested;
130+
// EXCEPT an exception is thrown when we publish an integer on the bus.
131+
var exception = await Should.ThrowAsync<ArgumentException>(_bus.Publish(3));
132+
exception.Message.ShouldBe("All numbers are terrible.");
172133
}
134+
}
173135

174-
using (_bus.Subscribe<int>(Callback))
136+
[Fact]
137+
internal async Task Subscriber_Should_Not_Be_Called_After_Corresponding_Subscription_Token_Disposed()
138+
{
139+
// GIVEN we register a callback to update a local variable.
140+
int? receivedValue = null;
141+
void Callback(int value) => receivedValue = value;
142+
var subscriptionToken = _bus.SubscribeSync<int>(Callback);
143+
144+
// WHEN we publish a value.
145+
await _bus.Publish(3);
146+
147+
// THEN the local variable should be this value.
148+
receivedValue.ShouldBe(3);
149+
receivedValue = null;
150+
151+
// WHEN we dispose the subscription token.
152+
subscriptionToken.Dispose();
153+
154+
// AND we publish another value.
155+
await _bus.Publish(5);
156+
157+
// THEN the variable should not have been updated.
158+
receivedValue.ShouldBeNull();
159+
}
160+
161+
[Fact]
162+
internal async Task Subscribers_Should_Be_Notified_In_Order_Of_Registration()
163+
{
164+
// GIVEN two subscribers register to the bus.
165+
var tcs = new TaskCompletionSource<object>();
166+
var secondCallbackCalled = false;
167+
168+
Task CallbackA(int _) => tcs.Task;
169+
void CallbackB(int _) => secondCallbackCalled = true;
170+
171+
using (_bus.Subscribe<int>(CallbackA))
172+
using (_bus.SubscribeSync<int>(CallbackB))
175173
{
176-
// WHEN we publish a message with a cancellation token.
177-
var cts = new CancellationTokenSource();
178-
var publicationTask = _bus.Publish(3, cts.Token);
174+
// WHEN we publish a message on the bus.
175+
var publicationTask = _bus.Publish(3);
179176

180-
// AND we signal cancellation.
181-
cts.Cancel();
177+
// THEN the second callback should not have been called.
178+
secondCallbackCalled.ShouldBeFalse();
182179

183-
// THEN the subscriber should be notified of that cancellation.
180+
// WHEN we signal completion of the first callback.
184181
tcs.SetResult(null);
182+
183+
// THEN the second callback should have been called.
185184
await publicationTask;
186-
cancellationRequested.ShouldBeTrue();
185+
secondCallbackCalled.ShouldBeTrue();
187186
}
188-
189187
}
190188

191189
[Fact]
192-
internal async Task Exceptions_Should_Propagate_From_Subscribers_To_Publication_Task()
190+
internal async Task Subscription_Should_Be_Contravariant()
193191
{
194-
// GIVEN a subscriber that throws an exception when notified.
195-
void Callback(int _) => throw new ArgumentException("All numbers are terrible.");
192+
// GIVEN we register a callback that handles Parent objects.
193+
Parent receivedParent = null;
194+
void Callback(Parent parent) => receivedParent = parent;
196195

197-
using (_bus.SubscribeSync<int>(Callback))
196+
using (_bus.SubscribeSync<Parent>(Callback))
198197
{
199-
// EXCEPT an exception is thrown when we publish an integer on the bus.
200-
var exception = await Should.ThrowAsync<ArgumentException>(_bus.Publish(3));
201-
exception.Message.ShouldBe("All numbers are terrible.");
198+
// WHEN we publish a child object.
199+
await _bus.Publish(new Child { Property = 5 });
200+
201+
// THEN the subscriber should have received the published child.
202+
receivedParent.ShouldNotBeNull();
203+
receivedParent.Property.ShouldBe(5);
202204
}
203205
}
204206

205-
private class Parent
207+
[Fact]
208+
internal async Task Subscription_Should_Not_Be_Covariant()
206209
{
207-
public int Property { get; set; }
210+
// GIVEN we register a callback that handles Child objects.
211+
Child receivedChild = null;
212+
void Callback(Child child) => receivedChild = child;
213+
214+
using (_bus.SubscribeSync<Child>(Callback))
215+
{
216+
// WHEN we publish a parent object.
217+
await _bus.Publish(new Parent { Property = 5 });
218+
219+
// THEN the subscriber should not have received the published parent.
220+
receivedChild.ShouldBeNull();
221+
}
208222
}
209223

210224
private class Child : Parent
211225
{
212226
}
227+
228+
private class Parent
229+
{
230+
public int Property { get; set; }
231+
}
213232
}
214233
}

‎AsyncBus.Tests/ObservableSpec.cs

+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Reactive.Linq;
4+
using System.Threading.Tasks;
5+
using Shouldly;
6+
using Xunit;
7+
8+
namespace AsyncBus.Tests
9+
{
10+
using ObservableSetup = Func<IObservable<int>, IObservable<int>>;
11+
12+
public class ObservableSpec
13+
{
14+
private readonly IBus _bus;
15+
16+
public ObservableSpec()
17+
{
18+
_bus = BusSetup.CreateBus();
19+
}
20+
21+
// ReSharper disable once MemberCanBePrivate.Global
22+
public static IEnumerable<object[]> RxExamples
23+
{
24+
get
25+
{
26+
yield return new object[]
27+
{
28+
new[] { 1, 2, 3, 4, 5, 6 },
29+
new ObservableSetup(observable => observable),
30+
new[] { 1, 2, 3, 4, 5, 6 }
31+
};
32+
33+
yield return new object[]
34+
{
35+
new[] { 1, 2, 2, 2, 3, 2, 2, 4 },
36+
new ObservableSetup(observable => observable.Distinct()),
37+
new[] { 1, 2, 3, 4 }
38+
};
39+
40+
yield return new object[]
41+
{
42+
new[] { 1, 2, 2, 2, 3, 2, 2, 4 },
43+
new ObservableSetup(observable => observable.DistinctUntilChanged()),
44+
new[] { 1, 2, 3, 2, 4 }
45+
};
46+
47+
yield return new object[]
48+
{
49+
new[] { 1, 2, 3, 4 },
50+
new ObservableSetup(observable => observable.Scan(0, (x, y) => x + y)),
51+
new[] { 1, 3, 6, 10 }
52+
};
53+
54+
yield return new object[]
55+
{
56+
new[] { 1, 2, 3, 4 },
57+
new ObservableSetup(observable => observable.Select(x => x * 2).Select(x => x - 5)),
58+
new[] { -3, -1, 1, 3 }
59+
};
60+
}
61+
}
62+
63+
[Fact]
64+
internal async Task Bus_Should_Support_Multiple_Observers()
65+
{
66+
var input = new[] { 1, 2, 3, 4 };
67+
var outputA = new List<int>();
68+
var outputB = new List<int>();
69+
70+
// GIVEN the bus has two observers
71+
using (_bus.Observe<int>().Subscribe(outputA.Add))
72+
using (_bus.Observe<int>().Subscribe(outputB.Add))
73+
{
74+
// WHEN we publish a sequence of messages.
75+
foreach (var message in input)
76+
{
77+
await _bus.Publish(message);
78+
}
79+
80+
// THEN both observers should have received the messages.
81+
outputA.ShouldBe(input);
82+
outputB.ShouldBe(input);
83+
}
84+
}
85+
86+
[Theory]
87+
[MemberData(nameof(RxExamples))]
88+
internal async Task Bus_Should_Support_Observation(int[] input, ObservableSetup setup, int[] expectedOutput)
89+
{
90+
var output = new List<int>();
91+
92+
using (setup(_bus.Observe<int>()).Subscribe(output.Add))
93+
{
94+
foreach (var message in input)
95+
{
96+
await _bus.Publish(message);
97+
}
98+
99+
output.ShouldBe(expectedOutput);
100+
}
101+
}
102+
103+
[Fact]
104+
internal void Observables_Returned_By_Bus_Should_Throw_Exception_On_Resubscription()
105+
{
106+
void ExampleCallback(int _)
107+
{
108+
}
109+
110+
var observable = _bus.Observe<int>();
111+
Should.NotThrow(() => observable.Subscribe(ExampleCallback));
112+
Should.Throw<InvalidOperationException>(() => observable.Subscribe(ExampleCallback));
113+
}
114+
115+
[Fact]
116+
internal async Task Observers_Should_Not_Receive_Messages_After_Disposing_Subscription()
117+
{
118+
// GIVEN the bus has two observers.
119+
var outputA = new List<int>();
120+
var outputB = new List<int>();
121+
var subscriptionA = _bus.Observe<int>().Subscribe(outputA.Add);
122+
var subscriptionB = _bus.Observe<int>().Subscribe(outputB.Add);
123+
124+
// EXPECT that observers only receive updates until they dispose their subscription.
125+
await _bus.Publish(1);
126+
outputA.ShouldBe(new[] { 1 });
127+
outputB.ShouldBe(new[] { 1 });
128+
129+
subscriptionA.Dispose();
130+
await _bus.Publish(2);
131+
outputA.ShouldBe(new[] { 1 });
132+
outputB.ShouldBe(new[] { 1, 2 });
133+
134+
subscriptionB.Dispose();
135+
await _bus.Publish(3);
136+
outputA.ShouldBe(new[] { 1 });
137+
outputB.ShouldBe(new[] { 1, 2 });
138+
}
139+
}
140+
}

‎AsyncBus/Subscription.cs ‎AsyncBus/ActionSubscription.cs

+15-12
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,18 @@
44

55
namespace AsyncBus
66
{
7-
/// <inheritdoc />
8-
internal sealed class Subscription<T> : ISubscription
7+
/// <summary>
8+
/// Represents a type of <see cref="ISubscription" /> that performs an asynchronous action when processing messages of the
9+
/// correct type.
10+
/// </summary>
11+
/// <typeparam name="T">The type of messages to handle.</typeparam>
12+
internal sealed class ActionSubscription<T> : ISubscription
913
{
1014
private readonly Func<T, CancellationToken, Task> _callback;
1115

1216
private bool _disposed;
1317

14-
public Subscription(Func<T, CancellationToken, Task> callback)
18+
public ActionSubscription(Func<T, CancellationToken, Task> callback)
1519
{
1620
_callback = callback ?? throw new ArgumentNullException(nameof(_callback));
1721
}
@@ -20,30 +24,29 @@ public Subscription(Func<T, CancellationToken, Task> callback)
2024
public event EventHandler Disposed;
2125

2226
/// <inheritdoc />
23-
public bool CanProcessMessage(object message)
24-
=> message is T && !_disposed;
27+
public bool CanProcessMessage(object message) => message is T && !_disposed;
2528

2629
/// <inheritdoc />
27-
public Task ProcessMessage(object message, CancellationToken cancellationToken)
30+
public void Dispose()
2831
{
2932
if (_disposed)
3033
{
31-
throw new ObjectDisposedException(GetType().FullName);
34+
return;
3235
}
3336

34-
return _callback((T)message, cancellationToken);
37+
Disposed?.Invoke(this, EventArgs.Empty);
38+
_disposed = true;
3539
}
3640

3741
/// <inheritdoc />
38-
public void Dispose()
42+
public Task ProcessMessage(object message, CancellationToken cancellationToken)
3943
{
4044
if (_disposed)
4145
{
42-
return;
46+
throw new ObjectDisposedException(GetType().FullName);
4347
}
4448

45-
Disposed?.Invoke(this, EventArgs.Empty);
46-
_disposed = true;
49+
return _callback((T)message, cancellationToken);
4750
}
4851
}
4952
}

‎AsyncBus/AsyncBus.csproj

+6-1
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,15 @@
66
<GenerateDocumentationFile>true</GenerateDocumentationFile>
77
<AssemblyName>AsyncBus</AssemblyName>
88
<PackageId>AsyncBus</PackageId>
9+
<Authors>ThymineC</Authors>
10+
<Copyright>MIT</Copyright>
911
<RepositoryType>git</RepositoryType>
1012
<RepositoryUrl>https://github.com/TAGC/AsyncBus</RepositoryUrl>
1113
</PropertyGroup>
1214
<ItemGroup>
13-
<DotNetCliToolReference Include="dotnet-setversion" Version="*" />
15+
<DotNetCliToolReference Include="dotnet-setversion" Version="1.*" />
16+
</ItemGroup>
17+
<ItemGroup>
18+
<PackageReference Include="System.Reactive" Version="4.0.0-preview00001" />
1419
</ItemGroup>
1520
</Project>

‎AsyncBus/Bus.cs

+19-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Threading;
45
using System.Threading.Tasks;
56

@@ -15,10 +16,22 @@ public Bus()
1516
_subscriptions = new List<ISubscription>();
1617
}
1718

19+
/// <inheritdoc />
20+
public IObservable<T> Observe<T>()
21+
{
22+
var subscription = new ObservableSubscription<T>();
23+
subscription.Disposed += (s, e) => _subscriptions.Remove(subscription);
24+
_subscriptions.Add(subscription);
25+
26+
return subscription;
27+
}
28+
1829
/// <inheritdoc />
1930
public async Task Publish(object message, CancellationToken cancellationToken = default)
2031
{
21-
foreach (var subscription in _subscriptions)
32+
var temp = _subscriptions.ToList();
33+
34+
foreach (var subscription in temp)
2235
{
2336
cancellationToken.ThrowIfCancellationRequested();
2437

@@ -32,19 +45,19 @@ public async Task Publish(object message, CancellationToken cancellationToken =
3245
/// <inheritdoc />
3346
public IDisposable Subscribe<T>(Func<T, CancellationToken, Task> callback)
3447
{
35-
var subscription = new Subscription<T>(callback);
48+
var subscription = new ActionSubscription<T>(callback);
3649
subscription.Disposed += (s, e) => _subscriptions.Remove(subscription);
3750
_subscriptions.Add(subscription);
3851

3952
return subscription;
4053
}
4154

4255
/// <inheritdoc />
43-
public IDisposable Subscribe<T>(Func<T, Task> callback)
44-
=> Subscribe<T>((message, cancellationToken) => callback(message));
56+
public IDisposable Subscribe<T>(Func<T, Task> callback) =>
57+
Subscribe<T>((message, cancellationToken) => callback(message));
4558

4659
/// <inheritdoc />
47-
public IDisposable SubscribeSync<T>(Action<T> callback)
48-
=> Subscribe<T>(message => Task.Run(() => callback(message)));
60+
public IDisposable SubscribeSync<T>(Action<T> callback) =>
61+
Subscribe<T>(message => Task.Run(() => callback(message)));
4962
}
5063
}

‎AsyncBus/IBus.cs

+30-16
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,21 @@ namespace AsyncBus
99
/// </summary>
1010
public interface IBus
1111
{
12+
/// <summary>
13+
/// Returns an <see cref="IObservable{T}" /> that allows clients to observe all messages of type
14+
/// <typeparamref name="T" /> published on the bus.
15+
/// <para />
16+
/// Each <c>IObservable</c> returned by this method supports only one subscriber. Callers should instead create additional
17+
/// instances of <c>IObservable</c> through this method if required.
18+
/// </summary>
19+
/// <typeparam name="T">The type of message to observe.</typeparam>
20+
/// <returns>An observable.</returns>
21+
IObservable<T> Observe<T>();
22+
1223
/// <summary>
1324
/// Publishes a message on the bus.
14-
/// <para/>
15-
/// Any subscribers registered to the actual type of <paramref name="message"/> or a supertype
25+
/// <para />
26+
/// Any subscribers registered to the runtime type of <paramref name="message" /> or a supertype
1627
/// will be notified of the message.
1728
/// </summary>
1829
/// <param name="message">The message to publish on the bus.</param>
@@ -25,41 +36,44 @@ public interface IBus
2536
Task Publish(object message, CancellationToken cancellationToken = default);
2637

2738
/// <summary>
28-
/// Registers a subscriber to messages of type <typeparamref name="T"/>.
29-
/// <para/>
30-
/// Message subscription is contravariant; subscribers of messages of type <typeparamref name="T"/>
31-
/// will also receive all messages that are a superclass of that type.
32-
/// <para/>
39+
/// Registers a subscriber to messages of type <typeparamref name="T" />.
40+
/// <para />
41+
/// Message subscription is contravariant; subscribers of messages of type <typeparamref name="T" />
42+
/// will also receive all messages that are a subclass of that type.
43+
/// <para />
3344
/// Subscribers will be passed both the message and a cancellation token that clients may use to signal
3445
/// cancellation of the message publication.
3546
/// </summary>
3647
/// <param name="callback">
3748
/// The asynchronous action to perform when an appropriate message is published on the bus.
3849
/// </param>
50+
/// <typeparam name="T">The type of message to subscribe to.</typeparam>
3951
/// <returns>A token that can be disposed to unregister this subscriber.</returns>
4052
IDisposable Subscribe<T>(Func<T, CancellationToken, Task> callback);
4153

4254
/// <summary>
43-
/// Registers a subscriber to messages of type <typeparamref name="T"/>.
44-
/// <para/>
45-
/// Message subscription is contravariant; subscribers of messages of type <typeparamref name="T"/>
46-
/// will also receive all messages that are a superclass of that type.
55+
/// Registers a subscriber to messages of type <typeparamref name="T" />.
56+
/// <para />
57+
/// Message subscription is contravariant; subscribers of messages of type <typeparamref name="T" />
58+
/// will also receive all messages that are a subclass of that type.
4759
/// </summary>
4860
/// <param name="callback">
4961
/// The asynchronous action to perform when an appropriate message is published on the bus.
5062
/// </param>
63+
/// <typeparam name="T">The type of message to subscribe to.</typeparam>
5164
/// <returns>A token that can be disposed to unregister this subscriber.</returns>
5265
IDisposable Subscribe<T>(Func<T, Task> callback);
53-
66+
5467
/// <summary>
55-
/// Registers a subscriber to messages of type <typeparamref name="T"/>.
56-
/// <para/>
57-
/// Message subscription is contravariant; subscribers of messages of type <typeparamref name="T"/>
58-
/// will also receive all messages that are a superclass of that type.
68+
/// Registers a subscriber to messages of type <typeparamref name="T" />.
69+
/// <para />
70+
/// Message subscription is contravariant; subscribers of messages of type <typeparamref name="T" />
71+
/// will also receive all messages that are a subclass of that type.
5972
/// </summary>
6073
/// <param name="callback">
6174
/// The synchronous action to perform when an appropriate message is published on the bus.
6275
/// </param>
76+
/// <typeparam name="T">The type of message to subscribe to.</typeparam>
6377
/// <returns>A token that can be disposed to unregister this subscriber.</returns>
6478
IDisposable SubscribeSync<T>(Action<T> callback);
6579
}

‎AsyncBus/ObservableSubscription.cs

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace AsyncBus
6+
{
7+
/// <summary>
8+
/// Represents a type of <see cref="ISubscription" /> that notifies observers when messages are received of the correct
9+
/// type.
10+
/// </summary>
11+
/// <typeparam name="T">The type of messages to handle.</typeparam>
12+
internal sealed class ObservableSubscription<T> : ISubscription, IObservable<T>
13+
{
14+
private bool _disposed;
15+
private IObserver<T> _observer;
16+
17+
/// <inheritdoc />
18+
public event EventHandler Disposed;
19+
20+
/// <inheritdoc />
21+
public bool CanProcessMessage(object message) => message is T && !_disposed;
22+
23+
/// <inheritdoc />
24+
public void Dispose()
25+
{
26+
if (_disposed)
27+
{
28+
return;
29+
}
30+
31+
Disposed?.Invoke(this, EventArgs.Empty);
32+
_observer = null;
33+
_disposed = true;
34+
}
35+
36+
/// <inheritdoc />
37+
public Task ProcessMessage(object message, CancellationToken cancellationToken)
38+
{
39+
if (_disposed)
40+
{
41+
throw new ObjectDisposedException(GetType().FullName);
42+
}
43+
44+
_observer?.OnNext((T)message);
45+
46+
return Task.CompletedTask;
47+
}
48+
49+
/// <inheritdoc />
50+
/// <exception cref="InvalidOperationException">This instance already has an observer.</exception>
51+
public IDisposable Subscribe(IObserver<T> observer)
52+
{
53+
if (_observer != null)
54+
{
55+
throw new InvalidOperationException(
56+
"This observable already has an observer and does not support multiple observers");
57+
}
58+
59+
_observer = observer;
60+
return this;
61+
}
62+
}
63+
}

‎GitVersion.yml

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
next-version: 0.2.0
2+
branches:
3+
master:
4+
increment: None
5+
releases?[/-]:
6+
increment: None
7+
feature:
8+
increment: None
9+
pull-request:
10+
increment: None
11+
hotfix:
12+
increment: None
13+
support:
14+
increment: None
15+
develop:
16+
increment: None
17+
ignore:
18+
sha: []

‎async-bus.sln

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
Microsoft Visual Studio Solution File, Format Version 12.00
22
# Visual Studio 15
3-
VisualStudioVersion = 15.0.27130.2027
3+
VisualStudioVersion = 15.0.27130.2036
44
MinimumVisualStudioVersion = 15.0.26124.0
55
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AsyncBus.Tests", "AsyncBus.Tests\AsyncBus.Tests.csproj", "{2BF57125-F558-46F7-87E6-03BAE4A36178}"
66
EndProject
77
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AsyncBus", "AsyncBus\AsyncBus.csproj", "{AC1DC16F-C8B5-47A9-AA44-60F406493F0A}"
88
EndProject
9+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{D76130FE-E77A-4843-AD45-D2049A285F38}"
10+
ProjectSection(SolutionItems) = preProject
11+
appveyor.yml = appveyor.yml
12+
GitVersion.yml = GitVersion.yml
13+
EndProjectSection
14+
EndProject
915
Global
1016
GlobalSection(SolutionConfigurationPlatforms) = preSolution
1117
Debug|Any CPU = Debug|Any CPU

0 commit comments

Comments
 (0)
Please sign in to comment.