diff --git a/src/DynamicData.Tests/Cache/FilterControllerFixture.cs b/src/DynamicData.Tests/Cache/FilterControllerFixture.cs deleted file mode 100644 index ebe073240..000000000 --- a/src/DynamicData.Tests/Cache/FilterControllerFixture.cs +++ /dev/null @@ -1,316 +0,0 @@ -using System; -using System.Linq; -using System.Reactive; -using System.Reactive.Linq; -using System.Reactive.Subjects; - -using DynamicData.Tests.Domain; - -using FluentAssertions; - -using Xunit; - -namespace DynamicData.Tests.Cache; - -public class FilterControllerFixture : IDisposable -{ - private readonly ISubject> _filter; - - private readonly ChangeSetAggregator _results; - - private readonly ISourceCache _source; - - public FilterControllerFixture() - { - _source = new SourceCache(p => p.Key); - _filter = new BehaviorSubject>(p => p.Age > 20); - _results = new ChangeSetAggregator(_source.Connect().Filter(_filter)); - } - - /* Should be the same as standard lambda filter */ - - [Fact] - public void AddMatched() - { - var person = new Person("Adult1", 50); - _source.AddOrUpdate(person); - - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Data.Count.Should().Be(1, "Should be 1 item in the cache"); - _results.Data.Items[0].Should().Be(person, "Should be same person"); - } - - [Fact] - public void AddNotMatched() - { - var person = new Person("Adult1", 10); - _source.AddOrUpdate(person); - - _results.Messages.Count.Should().Be(0, "Should have no item updates"); - _results.Data.Count.Should().Be(0, "Cache should have no items"); - } - - [Fact] - public void AddNotMatchedAndUpdateMatched() - { - const string key = "Adult1"; - var notmatched = new Person(key, 19); - var matched = new Person(key, 21); - - _source.Edit( - innerCache => - { - innerCache.AddOrUpdate(notmatched); - innerCache.AddOrUpdate(matched); - }); - - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Messages[0].First().Current.Should().Be(matched, "Should be same person"); - _results.Data.Items[0].Should().Be(matched, "Should be same person"); - } - - [Fact] - public void AttemptedRemovalOfANonExistentKeyWillBeIgnored() - { - const string key = "Adult1"; - _source.Remove(key); - _results.Messages.Count.Should().Be(0, "Should be 0 updates"); - } - - [Fact] - public void BatchOfUniqueUpdates() - { - var people = Enumerable.Range(1, 100).Select(i => new Person("Name" + i, i)).ToArray(); - - _source.AddOrUpdate(people); - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Messages[0].Adds.Should().Be(80, "Should return 80 adds"); - - var filtered = people.Where(p => p.Age > 20).OrderBy(p => p.Age).ToArray(); - _results.Data.Items.OrderBy(p => p.Age).Should().BeEquivalentTo(filtered, "Incorrect Filter result"); - } - - [Fact] - public void BatchRemoves() - { - var people = Enumerable.Range(1, 100).Select(l => new Person("Name" + l, l)).ToArray(); - - _source.AddOrUpdate(people); - _source.Remove(people); - - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(80, "Should be 80 addes"); - _results.Messages[1].Removes.Should().Be(80, "Should be 80 removes"); - _results.Data.Count.Should().Be(0, "Should be nothing cached"); - } - - [Fact] - public void BatchSuccessiveUpdates() - { - var people = Enumerable.Range(1, 100).Select(l => new Person("Name" + l, l)).ToArray(); - foreach (var person in people) - { - var person1 = person; - _source.AddOrUpdate(person1); - } - - _results.Messages.Count.Should().Be(80, "Should be 80 messages"); - _results.Data.Count.Should().Be(80, "Should be 80 in the cache"); - var filtered = people.Where(p => p.Age > 20).OrderBy(p => p.Age).ToArray(); - _results.Data.Items.OrderBy(p => p.Age).Should().BeEquivalentTo(filtered, "Incorrect Filter result"); - } - - [Fact] - public void ChangeFilter() - { - var people = Enumerable.Range(1, 100).Select(i => new Person("P" + i, i)).ToArray(); - - _source.AddOrUpdate(people); - _results.Data.Count.Should().Be(80, "Should be 80 people in the cache"); - - _filter.OnNext(p => p.Age <= 50); - _results.Data.Count.Should().Be(50, "Should be 50 people in the cache"); - _results.Messages.Count.Should().Be(2, "Should be 2 update messages"); - _results.Messages[1].Removes.Should().Be(50, "Should be 50 removes in the second message"); - _results.Messages[1].Adds.Should().Be(20, "Should be 20 adds in the second message"); - - _results.Data.Items.All(p => p.Age <= 50).Should().BeTrue(); - } - - [Fact] - public void Clear() - { - var people = Enumerable.Range(1, 100).Select(l => new Person("Name" + l, l)).ToArray(); - _source.AddOrUpdate(people); - _source.Clear(); - - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(80, "Should be 80 addes"); - _results.Messages[1].Removes.Should().Be(80, "Should be 80 removes"); - _results.Data.Count.Should().Be(0, "Should be nothing cached"); - } - - public void Dispose() - { - _source.Dispose(); - _results.Dispose(); - } - - [Fact] - public void ReapplyFilterDoesntThrow() - { - using var source = new SourceCache(p => p.Key); - source.AddOrUpdate(Enumerable.Range(1, 100).Select(i => new Person("P" + i, i)).ToArray()); - - var ex = Record.Exception(() => source.Connect().Filter(Observable.Return(Unit.Default)).AsObservableCache()); - Assert.Null(ex); - } - - [Fact] - public void ReevaluateFilter() - { - //re-evaluate for inline changes - var people = Enumerable.Range(1, 100).Select(i => new Person("P" + i, i)).ToArray(); - - _source.AddOrUpdate(people); - _results.Data.Count.Should().Be(80, "Should be 80 people in the cache"); - - foreach (var person in people) - { - person.Age += 10; - } - - _filter.OnNext(p => p.Age > 20); - - _results.Data.Count.Should().Be(90, "Should be 90 people in the cache"); - _results.Messages.Count.Should().Be(2, "Should be 2 update messages"); - _results.Messages[1].Adds.Should().Be(10, "Should be 10 adds in the second message"); - - foreach (var person in people) - { - person.Age -= 10; - } - - _filter.OnNext(p => p.Age > 20); - - _results.Data.Count.Should().Be(80, "Should be 80 people in the cache"); - _results.Messages.Count.Should().Be(3, "Should be 3 update messages"); - _results.Messages[2].Removes.Should().Be(10, "Should be 10 removes in the third message"); - } - - [Fact] - public void Remove() - { - const string key = "Adult1"; - var person = new Person(key, 50); - - _source.AddOrUpdate(person); - _source.Remove(key); - - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(1, "Should be 80 addes"); - _results.Messages[1].Removes.Should().Be(1, "Should be 80 removes"); - _results.Data.Count.Should().Be(0, "Should be nothing cached"); - } - - [Fact] - public void RepeatedApply() - { - using var source = new SourceCache(p => p.Key); - source.AddOrUpdate(Enumerable.Range(1, 100).Select(i => new Person("P" + i, i)).ToArray()); - _filter.OnNext(p => true); - - IChangeSet? latestChanges = null; - using (source.Connect().Filter(_filter).Do(changes => latestChanges = changes).AsObservableCache()) - { - if (latestChanges is null) - { - throw new InvalidOperationException(nameof(latestChanges)); - } - - _filter.OnNext(p => false); - latestChanges.Removes.Should().Be(100); - latestChanges.Adds.Should().Be(0); - - _filter.OnNext(p => true); - latestChanges.Adds.Should().Be(100); - latestChanges.Removes.Should().Be(0); - - _filter.OnNext(p => false); - latestChanges.Removes.Should().Be(100); - latestChanges.Adds.Should().Be(0); - - _filter.OnNext(p => true); - latestChanges.Adds.Should().Be(100); - latestChanges.Removes.Should().Be(0); - - _filter.OnNext(p => false); - latestChanges.Removes.Should().Be(100); - latestChanges.Adds.Should().Be(0); - } - } - - [Fact] - public void SameKeyChanges() - { - const string key = "Adult1"; - - _source.Edit( - updater => - { - updater.AddOrUpdate(new Person(key, 50)); - updater.AddOrUpdate(new Person(key, 52)); - updater.AddOrUpdate(new Person(key, 53)); - updater.Remove(key); - }); - - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Messages[0].Adds.Should().Be(1, "Should be 1 adds"); - _results.Messages[0].Updates.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Removes.Should().Be(1, "Should be 1 remove"); - } - - [Fact] - public void UpdateMatched() - { - const string key = "Adult1"; - var newperson = new Person(key, 50); - var updated = new Person(key, 51); - - _source.AddOrUpdate(newperson); - _source.AddOrUpdate(updated); - - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(1, "Should be 1 adds"); - _results.Messages[1].Updates.Should().Be(1, "Should be 1 update"); - } - - [Fact] - public void UpdateNotMatched() - { - const string key = "Adult1"; - var newperson = new Person(key, 10); - var updated = new Person(key, 11); - - _source.AddOrUpdate(newperson); - _source.AddOrUpdate(updated); - - _results.Messages.Count.Should().Be(0, "Should be no updates"); - _results.Data.Count.Should().Be(0, "Should nothing cached"); - } - - [Fact] - public void EmptyChanges() - { - IChangeSet? change = null; - - //need to also apply overload on connect as that will also need to provide and empty notification - using var subscription = _source.Connect(suppressEmptyChangeSets: false) - .Filter(_filter, false) - .Subscribe(c => change = c); - - change.Should().NotBeNull(); - change!.Count.Should().Be(0); - } -} diff --git a/src/DynamicData.Tests/Cache/FilterFixture.Base.cs b/src/DynamicData.Tests/Cache/FilterFixture.Base.cs new file mode 100644 index 000000000..dad57e373 --- /dev/null +++ b/src/DynamicData.Tests/Cache/FilterFixture.Base.cs @@ -0,0 +1,399 @@ +using System; +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; + +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.Cache; + +public static partial class FilterFixture +{ + public abstract class Base + { + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void ExcludedItemsAreRemoved_NoChangesAreMade(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial changeset was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have been added"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.Remove(source.Items.Where(static item => !item.IsIncluded).ToArray()); + + results.Error.Should().BeNull(); + if (emptyChangesetPolicy is EmptyChangesetPolicy.IncludeEmptyChangesets) + { + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "only excluded items were manipulated"); + } + else + { + results.RecordedChangeSets.Skip(1).Should().BeEmpty("empty changesets should be suppressed"); + } + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void ItemsAreAdded_MatchingItemsPropagate(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + + // UUT Intialization + using var subscription = BuildUut( + source: source.Connect(), + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have been added"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void ItemsAreMoved_MovementsAreIgnored(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new Subject>(); + + var items = new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true } + }; + + // UUT Initialization + using var subscription = BuildUut( + source: source + .Prepend(new ChangeSet(items + .Select((item, index) => new Change( + reason: ChangeReason.Add, + key: item.Id, + current: item, + index: index)))), + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial changeset was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(items, "all matching items should have been added"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Moved, key: items[2].Id, items[2], previous: default, currentIndex: 0, previousIndex: 2), + new(reason: ChangeReason.Moved, key: items[1].Id, items[1], previous: default, currentIndex: 1, previousIndex: 2) + }); + + results.Error.Should().BeNull(); + if (emptyChangesetPolicy is EmptyChangesetPolicy.IncludeEmptyChangesets) + { + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source opreation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(items, "no changes should have been made"); + } + else + { + results.RecordedChangeSets.Skip(1).Should().BeEmpty("empty changesets should be suppressed"); + } + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void ItemsAreRefreshed_ItemsAreReFilteredOrRefreshed(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial changeset was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (add items) + foreach (var item in source.Items) + item.IsIncluded = true; + + source.Refresh(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedChangeSets.ElementAt(1).ShouldHaveRefreshed(source.Items.Take(3), "all unchanged items should have been refreshed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all newly-matching items should have been added"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (remove items) + foreach (var item in source.Items.Take(3)) + item.IsIncluded = false; + + source.Refresh(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedChangeSets.ElementAt(2).ShouldHaveRefreshed(source.Items.Skip(3), "all unchanged items should have been refreshed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all newly-excluded items should have been removed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + public void ItemsAreUpdated_ItemsAreReFiltered(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Intialization + using var subscription = BuildUut( + source: source.Connect(), + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial changeset was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (add and update items) + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = true }, + new Item() { Id = 5, IsIncluded = true }, + new Item() { Id = 6, IsIncluded = true } + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all newly-matching items should have been added"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action (remove and update items) + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = false }, + new Item() { Id = 2, IsIncluded = false }, + new Item() { Id = 3, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = true }, + new Item() { Id = 5, IsIncluded = true }, + new Item() { Id = 6, IsIncluded = true } + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all newly-excluded items should have been removed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void MatchingItemsAreRemoved_RemovalsPropagate(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = BuildUut( + source: source.Connect(), + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial changeset was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.Remove(source.Items.Where(Item.FilterByIsIncluded).ToArray()); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEmpty("all matching items were removed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(CompletionStrategy.Asynchronous)] + [InlineData(CompletionStrategy.Immediate)] + public void SourceFails_ErrorPropagates(CompletionStrategy completionStrategy) + { + var error = new Exception("Test"); + + using var source = new TestSourceCache(Item.SelectId); + + if (completionStrategy is CompletionStrategy.Immediate) + source.SetError(error); + + using var subscription = BuildUut( + source: source.Connect(), + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: true) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (completionStrategy is CompletionStrategy.Asynchronous) + source.SetError(error); + + results.Error.Should().Be(error); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + } + + [Fact] + public void SourceIsNull_ThrowsException() + => FluentActions.Invoking(() => BuildUut( + source: null!, + predicate: Item.FilterByIsIncluded, + suppressEmptyChangeSets: true)) + .Should() + .Throw(); + + protected abstract IObservable> BuildUut( + IObservable> source, + Func predicate, + bool suppressEmptyChangeSets); + } +} diff --git a/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicate.IntegrationTests.cs b/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicate.IntegrationTests.cs new file mode 100644 index 000000000..8b5c76e4c --- /dev/null +++ b/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicate.IntegrationTests.cs @@ -0,0 +1,76 @@ +using System; +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading.Tasks; + +using Bogus; +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.Cache; + +public static partial class FilterFixture +{ + public static partial class DynamicPredicate + { + public sealed class IntegrationTests + : IntegrationTestFixtureBase + { + [Fact(Timeout = 60_000)] + public async Task NotificationsOccurOnDifferentThreads_OperatorIsThreadSafe() + { + // Setup + var randomizer = new Randomizer(0x1234567); + + (var items, var changeSets) = GenerateStressItemsAndChangeSets( + editCount: 5_000, + maxChangeCount: 20, + randomizer: randomizer); + + var predicates = GenerateRandomIdInclusionMasks( + valueCount: 5_000, + randomizer: randomizer) + .Select(mask => new Func(item => Item.FilterByIdInclusionMask(mask, item))) + .ToArray(); + + using var source = new Subject>(); + using var predicateChanged = new Subject>(); + + + // UUT Initialization + using var subscription = source + .Filter(predicateChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + + // UUT Action + await Task.WhenAll( + Task.Run(() => + { + foreach (var changeSet in changeSets) + source.OnNext(changeSet); + }), + Task.Run(() => + { + foreach (var predicate in predicates) + predicateChanged.OnNext(predicate); + })); + + var finalPredicate = predicates[^1]; + + results.Error.Should().BeNull(); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(items.Items.Where(finalPredicate), "the source colleciton should be filtered to include only items matching the final predicate"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + } + } +} diff --git a/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicate.UnitTests.cs b/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicate.UnitTests.cs new file mode 100644 index 000000000..5315727d1 --- /dev/null +++ b/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicate.UnitTests.cs @@ -0,0 +1,408 @@ +using System; +using System.Linq; +using System.Reactive; +using System.Reactive.Linq; +using System.Reactive.Subjects; + +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.Cache; + +public static partial class FilterFixture +{ + public static partial class DynamicPredicate + { + public sealed class UnitTests + : Base + { + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void ChangesAreMadeBeforeInitialPredicateChangedValue_ItemsAreExcluded(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter( + predicateChanged: Observable.Never>(), + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + // Add changes + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = false } + }); + + // Refresh changes, with no item mutations. + source.Refresh(); + + // Refresh changes, with item mutations affecting filtering. + foreach (var item in source.Items) + item.IsIncluded = !item.IsIncluded; + source.Refresh(); + + // Remove changes + source.RemoveKeys(new[] { 2, 3 }); + + // Update changes, not affecting filtering + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = true } + }); + + // Update changes, affecting filtering + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false } + }); + + if (emptyChangesetPolicy is EmptyChangesetPolicy.IncludeEmptyChangesets) + { + results.RecordedChangeSets.Count.Should().Be(6, "6 source operations were performed"); + results.RecordedItemsByKey.Should().BeEmpty("the predicate has not initialized"); + } + else + { + results.RecordedChangeSets.Should().BeEmpty("empty changesets should be suppressed"); + } + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void PredicateChangedChanges_ItemsAreReFiltered(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + using var predicateChanged = new BehaviorSubject>(Item.FilterByIsIncluded); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter( + predicateChanged: predicateChanged, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial changeset was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + predicateChanged.OnNext(Item.FilterByEvenId); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 predicate change occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByEvenId), "newly-matching items should have been added, and newly-excluded items should have been removed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(CompletionStrategy.Asynchronous)] + [InlineData(CompletionStrategy.Immediate)] + public void PredicateChangedCompletesAfterInitialValue_CompletionWaitsForSourceCompletion(CompletionStrategy completionStrategy) + { + // Setup + var source = new TestSourceCache(Item.SelectId); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + var predicateChanged = (completionStrategy is CompletionStrategy.Asynchronous) + ? new Subject>() + : Observable.Return(Item.FilterByIsIncluded); + + var reapplyFilter = new Subject(); + + + // UUT Initialization & Action + using var subscription = source.Connect() + .Filter(predicateChanged) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + if (predicateChanged is Subject> subject) + { + subject.OnNext(Item.FilterByIsIncluded); + subject.OnCompleted(); + } + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial predicate, was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("changes could still be generated by the source"); + + + // UUT Action + source.Complete(); + + results.Error.Should().BeNull(); + if (completionStrategy is CompletionStrategy.Asynchronous) + results.RecordedChangeSets.Skip(2).Should().BeEmpty("no source operations were performed"); + else + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeTrue("all source streams have completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(CompletionStrategy.Immediate, EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(CompletionStrategy.Immediate, EmptyChangesetPolicy.SuppressEmptyChangesets)] + [InlineData(CompletionStrategy.Asynchronous, EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(CompletionStrategy.Asynchronous, EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void PredicateChangedCompletesBeforeInitialValue_CompletionPropagatesIfEmptyChangesetsAreSuppressed( + CompletionStrategy completionStrategy, + EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var predicateChanged = (completionStrategy is CompletionStrategy.Asynchronous) + ? new Subject>() + : Observable.Empty>(); + + + // UUT Initialization & Action + using var subscription = source.Connect() + .Filter( + predicateChanged: predicateChanged, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + if (predicateChanged is Subject> subject) + subject.OnCompleted(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + if (emptyChangesetPolicy is EmptyChangesetPolicy.IncludeEmptyChangesets) + results.HasCompleted.Should().BeFalse("additional empty changesets can occur"); + else + results.HasCompleted.Should().BeTrue("only empty changesets can occur"); + } + + [Theory] + [InlineData(CompletionStrategy.Asynchronous)] + [InlineData(CompletionStrategy.Immediate)] + public void PredicateChangedFails_ErrorPropagates(CompletionStrategy completionStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var error = new Exception("Test"); + + var predicateChanged = (completionStrategy is CompletionStrategy.Asynchronous) + ? new Subject>() + : Observable.Throw>(error); + + + // UUT Initialization & Action + using var subscription = source.Connect() + .Filter( + predicateChanged: predicateChanged) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + if (predicateChanged is Subject> subject) + subject.OnError(error); + + results.Error.Should().Be(error, "errors should propagate downstream"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + } + + [Fact] + public void PredicateChangedIsNull_ThrowsException() + => FluentActions.Invoking(() => ObservableCacheEx.Filter( + source: Observable.Return(ChangeSet.Empty), + reapplyFilter: Observable.Never(), + predicateChanged: null!)) + .Should() + .Throw(); + + [Theory] + [InlineData(CompletionStrategy.Asynchronous, EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(CompletionStrategy.Asynchronous, EmptyChangesetPolicy.SuppressEmptyChangesets)] + [InlineData(CompletionStrategy.Immediate, EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(CompletionStrategy.Immediate, EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void SourceCompletesWhenEmpty_CompletionPropagatesWhenEmptyChangesetsAreSuppressed( + CompletionStrategy completionStrategy, + EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + // UUT Initialization & Action + if (completionStrategy is CompletionStrategy.Immediate) + source.Complete(); + + using var subscription = source.Connect() + .Filter( + predicateChanged: Observable.Concat( + Observable.Return(Item.FilterByIsIncluded), + Observable.Never>()), + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (completionStrategy is CompletionStrategy.Asynchronous) + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + if (emptyChangesetPolicy is EmptyChangesetPolicy.IncludeEmptyChangesets) + results.HasCompleted.Should().BeFalse("the source has completed, but further empty changesets can occur"); + else + results.HasCompleted.Should().BeTrue("the source has completed, and no further changesets can occur"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(CompletionStrategy.Asynchronous)] + [InlineData(CompletionStrategy.Immediate)] + public void SourceCompletesWhenNotEmpty_CompletionWaitsForPredicateChangedCompletion(CompletionStrategy completionStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + using var predicateChanged = new BehaviorSubject>(Item.FilterByIsIncluded); + + + // UUT Initialization & Action + if (completionStrategy is CompletionStrategy.Immediate) + source.Complete(); + + using var subscription = source.Connect() + .Filter(predicateChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (completionStrategy is CompletionStrategy.Asynchronous) + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial changeset was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("the collection could still change due to new predicates"); + + + // UUT Action + predicateChanged.OnCompleted(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "no changes should have been made"); + results.HasCompleted.Should().BeTrue("all source streams have completed"); + + + // Final Verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Fact] + public void SubscriptionIsDisposed_SubscriptionDisposalPropagates() + { + // Setup + using var source = new Subject>(); + using var predicateChanged = new BehaviorSubject>(Item.FilterByIsIncluded); + + + // UUT Initialization + using var subscription = source + .Filter(predicateChanged) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + subscription.Dispose(); + + source .HasObservers.Should().BeFalse("subscription disposal should propagate to all sources"); + predicateChanged.HasObservers.Should().BeFalse("subscription disposal should propagate to all sources"); + } + + protected override IObservable> BuildUut( + IObservable> source, + Func predicate, + bool suppressEmptyChangeSets) + => source.Filter( + predicateChanged: Observable.Return(predicate), + suppressEmptyChangeSets: suppressEmptyChangeSets); + } + } +} diff --git a/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicateAndReFiltering.IntegrationTests.cs b/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicateAndReFiltering.IntegrationTests.cs new file mode 100644 index 000000000..8491fe821 --- /dev/null +++ b/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicateAndReFiltering.IntegrationTests.cs @@ -0,0 +1,85 @@ +using System; +using System.Linq; +using System.Reactive; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading.Tasks; + +using Bogus; +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.Cache; + +public static partial class FilterFixture +{ + public static partial class DynamicPredicateAndReFiltering + { + public sealed class IntegrationTests + : IntegrationTestFixtureBase + { + [Fact(Timeout = 60_000)] + public async Task NotificationsOccurOnDifferentThreads_OperatorIsThreadSafe() + { + // Setup + var randomizer = new Randomizer(0x1234567); + + (var items, var changeSets) = GenerateStressItemsAndChangeSets( + editCount: 5_000, + maxChangeCount: 20, + randomizer: randomizer); + + var predicates = GenerateRandomIdInclusionMasks( + valueCount: 5_000, + randomizer: randomizer) + .Select(mask => new Func(item => Item.FilterByIdInclusionMask(mask, item))) + .ToArray(); + + using var source = new Subject>(); + using var predicateChanged = new Subject>(); + using var reapplyFilter = new Subject(); + + + // UUT Initialization + using var subscription = source + .Filter( + predicateChanged: predicateChanged, + reapplyFilter: reapplyFilter) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + + // UUT Action + await Task.WhenAll( + Task.Run(() => + { + foreach (var changeSet in changeSets) + source.OnNext(changeSet); + }), + Task.Run(() => + { + foreach (var predicate in predicates) + predicateChanged.OnNext(predicate); + }), + Task.Run(() => + { + for (var i = 0; i < 10_000; ++i) + reapplyFilter.OnNext(Unit.Default); + })); + + var finalPredicate = predicates[^1]; + + results.Error.Should().BeNull(); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(items.Items.Where(finalPredicate), "the source colleciton should be filtered to include only items matching the final predicate"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + } + } +} diff --git a/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicateAndReFiltering.UnitTests.cs b/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicateAndReFiltering.UnitTests.cs new file mode 100644 index 000000000..e56101cb5 --- /dev/null +++ b/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicateAndReFiltering.UnitTests.cs @@ -0,0 +1,627 @@ +using System; +using System.Linq; +using System.Reactive; +using System.Reactive.Linq; +using System.Reactive.Subjects; + +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.Cache; + +public static partial class FilterFixture +{ + public static partial class DynamicPredicateAndReFiltering + { + public sealed class UnitTests + : Base + { + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void ChangesAreMadeBeforeInitialPredicateChangedValue_ItemsAreExcluded(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter( + predicateChanged: Observable.Never>(), + reapplyFilter: Observable.Never(), + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + // Add changes + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = false } + }); + + // Refresh changes, with no item mutations. + source.Refresh(); + + // Refresh changes, with item mutations affecting filtering. + foreach (var item in source.Items) + item.IsIncluded = !item.IsIncluded; + source.Refresh(); + + // Remove changes + source.RemoveKeys(new[] { 2, 3 }); + + // Update changes, not affecting filtering + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = true } + }); + + // Update changes, affecting filtering + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false } + }); + + if (emptyChangesetPolicy is EmptyChangesetPolicy.IncludeEmptyChangesets) + results.RecordedChangeSets.Count.Should().Be(6, "6 source operations were performed"); + else + results.RecordedChangeSets.Should().BeEmpty("empty changesets should be suppressed"); + results.RecordedItemsByKey.Should().BeEmpty("the predicate has not initialized"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void PredicateChangedChanges_ItemsAreReFiltered(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + using var predicateChanged = new BehaviorSubject>(Item.FilterByIsIncluded); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter( + predicateChanged: predicateChanged, + reapplyFilter: Observable.Never(), + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial changeset was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + predicateChanged.OnNext(Item.FilterByEvenId); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 predicate change occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByEvenId), "newly-matching items should have been added, and newly-excluded items should have been removed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(CompletionStrategy.Asynchronous, DynamicParameter.Source)] + [InlineData(CompletionStrategy.Asynchronous, DynamicParameter.ReapplyFilter)] + [InlineData(CompletionStrategy.Immediate, DynamicParameter.Source)] + [InlineData(CompletionStrategy.Immediate, DynamicParameter.ReapplyFilter)] + public void PredicateChangedCompletesAfterInitialValue_CompletionWaitsForSourceAndReapplyFilterCompletion( + CompletionStrategy completionStrategy, + DynamicParameter lastCompletion) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + var predicateChanged = (completionStrategy is CompletionStrategy.Asynchronous) + ? new Subject>() + : Observable.Return(Item.FilterByIsIncluded); + + var reapplyFilter = new Subject(); + + + // UUT Initialization & Action + using var subscription = source.Connect() + .Filter( + predicateChanged: predicateChanged, + reapplyFilter: reapplyFilter) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + if (predicateChanged is Subject> subject) + { + subject.OnNext(Item.FilterByIsIncluded); + subject.OnCompleted(); + } + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial predicate, was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("changes could still be generated by the source"); + + + // UUT Action (second completion) + if (lastCompletion is DynamicParameter.ReapplyFilter) + source.Complete(); + else + reapplyFilter.OnCompleted(); + + results.Error.Should().BeNull(); + if (completionStrategy is CompletionStrategy.Asynchronous) + results.RecordedChangeSets.Skip(2).Should().BeEmpty("no source operations were performed"); + else + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("changes could still be generated by the filtering sources"); + + + // UUT Action (last completion) + if (lastCompletion is DynamicParameter.ReapplyFilter) + reapplyFilter.OnCompleted(); + else + source.Complete(); + + results.Error.Should().BeNull(); + if (completionStrategy is CompletionStrategy.Asynchronous) + results.RecordedChangeSets.Skip(2).Should().BeEmpty("no source operations were performed"); + else + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeTrue("all source streams have completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(CompletionStrategy.Immediate, EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(CompletionStrategy.Immediate, EmptyChangesetPolicy.SuppressEmptyChangesets)] + [InlineData(CompletionStrategy.Asynchronous, EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(CompletionStrategy.Asynchronous, EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void PredicateChangedCompletesBeforeInitialValue_CompletionPropagatesIfEmptyChangesetsAreSuppressed( + CompletionStrategy completionStrategy, + EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var predicateChanged = (completionStrategy is CompletionStrategy.Asynchronous) + ? new Subject>() + : Observable.Empty>(); + + + // UUT Initialization & Action + using var subscription = source.Connect() + .Filter( + predicateChanged: predicateChanged, + reapplyFilter: Observable.Never(), + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + if (predicateChanged is Subject> subject) + subject.OnCompleted(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + if (emptyChangesetPolicy is EmptyChangesetPolicy.IncludeEmptyChangesets) + results.HasCompleted.Should().BeFalse("the source has completed, but further empty changesets can occur"); + else + results.HasCompleted.Should().BeTrue("the source has completed, and no further changesets can occur"); + } + + [Theory] + [InlineData(CompletionStrategy.Asynchronous)] + [InlineData(CompletionStrategy.Immediate)] + public void PredicateChangedFails_ErrorPropagates(CompletionStrategy completionStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var error = new Exception("Test"); + + var predicateChanged = (completionStrategy is CompletionStrategy.Asynchronous) + ? new Subject>() + : Observable.Throw>(error); + + + // UUT Initialization & Action + using var subscription = source.Connect() + .Filter( + predicateChanged: predicateChanged, + reapplyFilter: Observable.Never()) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + if (predicateChanged is Subject> subject) + subject.OnError(error); + + results.Error.Should().Be(error, "errors should propagate downstream"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + } + + [Fact] + public void PredicateChangedIsNull_ThrowsException() + => FluentActions.Invoking(() => ObservableCacheEx.Filter( + source: Observable.Return(ChangeSet.Empty), + reapplyFilter: Observable.Never(), + predicateChanged: null!)) + .Should() + .Throw(); + + [Theory] + [InlineData(CompletionStrategy.Immediate, DynamicParameter.PredicateChanged)] + [InlineData(CompletionStrategy.Immediate, DynamicParameter.Source)] + [InlineData(CompletionStrategy.Asynchronous, DynamicParameter.PredicateChanged)] + [InlineData(CompletionStrategy.Asynchronous, DynamicParameter.Source)] + public void ReapplyFilterCompletes_CompletionWaitsForSourceAndPredicateChangedCompletion( + CompletionStrategy completionStrategy, + DynamicParameter lastCompletion) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + var predicateChanged = new BehaviorSubject>(Item.FilterByIsIncluded); + + var reapplyFilter = (completionStrategy is CompletionStrategy.Asynchronous) + ? new Subject() + : Observable.Empty(); + + + // UUT Initialization & Action + using var subscription = source.Connect() + .Filter( + predicateChanged: predicateChanged, + reapplyFilter: reapplyFilter) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + if (reapplyFilter is Subject subject) + subject.OnCompleted(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial changeset was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("changes could still be generated by the source"); + + + // UUT Action (second completion) + if (lastCompletion is DynamicParameter.PredicateChanged) + source.Complete(); + else + predicateChanged.OnCompleted(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("changes could still be generated by other source streams"); + + + // UUT Action (last completion) + if (lastCompletion is DynamicParameter.PredicateChanged) + predicateChanged.OnCompleted(); + else + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeTrue("all input streams have completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(CompletionStrategy.Asynchronous)] + [InlineData(CompletionStrategy.Immediate)] + public void ReapplyFilterFails_ErrorPropagates(CompletionStrategy completionStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var error = new Exception("Test"); + + var reapplyFilter = (completionStrategy is CompletionStrategy.Asynchronous) + ? new Subject() + : Observable.Throw(error); + + + // UUT Initialization & Action + using var subscription = source.Connect() + .Filter( + predicateChanged: Observable.Return(Item.FilterByIsIncluded), + reapplyFilter: reapplyFilter) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + if (reapplyFilter is Subject subject) + subject.OnError(error); + + results.Error.Should().Be(error, "errors should propagate downstream"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + } + + [Fact] + public void ReapplyFilterIsNull_ThrowsException() + => FluentActions.Invoking(() => ObservableCacheEx.Filter( + source: Observable.Return(ChangeSet.Empty), + predicateChanged: Observable.Return(Item.FilterByIsIncluded), + reapplyFilter: null!)) + .Should() + .Throw(); + + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void ReapplyFilterOccurs_ItemsAreReFiltered(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + using var reapplyFilter = new Subject(); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + var predicate = Item.FilterByIsIncluded; + + + // UUT Initialization + using var subscription = source.Connect() + .Filter( + predicateChanged: Observable.Return>(item => predicate.Invoke(item)), + reapplyFilter: reapplyFilter, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial changeset was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(predicate), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + source.Items[1].IsIncluded = false; + source.Items[5].IsIncluded = true; + predicate = Item.FilterByEvenId; + reapplyFilter.OnNext(Unit.Default); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 re-filter request occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(predicate), "newly-matching items should have been added, and newly-excluded items should have been removed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(CompletionStrategy.Asynchronous, EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(CompletionStrategy.Asynchronous, EmptyChangesetPolicy.SuppressEmptyChangesets)] + [InlineData(CompletionStrategy.Immediate, EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(CompletionStrategy.Immediate, EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void SourceCompletesWhenEmpty_CompletionPropagatesWhenEmptyChangesetsAreSuppressed( + CompletionStrategy completionStrategy, + EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + + // UUT Initialization & Action + if (completionStrategy is CompletionStrategy.Immediate) + source.Complete(); + + using var subscription = source.Connect() + .Filter( + predicateChanged: Observable.Concat( + Observable.Return(Item.FilterByIsIncluded), + Observable.Never>()), + reapplyFilter: Observable.Never(), + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (completionStrategy is CompletionStrategy.Asynchronous) + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + if (emptyChangesetPolicy is EmptyChangesetPolicy.IncludeEmptyChangesets) + results.HasCompleted.Should().BeFalse("the source has completed, but further empty changesets can occur"); + else + results.HasCompleted.Should().BeTrue("the source has completed, and no further changesets can occur"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(CompletionStrategy.Asynchronous, DynamicParameter.PredicateChanged)] + [InlineData(CompletionStrategy.Asynchronous, DynamicParameter.ReapplyFilter)] + [InlineData(CompletionStrategy.Immediate, DynamicParameter.PredicateChanged)] + [InlineData(CompletionStrategy.Immediate, DynamicParameter.ReapplyFilter)] + public void SourceCompletesWhenNotEmpty_CompletionWaitsForPredicateChangedAndReapplyFilterCompletion( + CompletionStrategy completionStrategy, + DynamicParameter lastCompletion) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + using var predicateChanged = new BehaviorSubject>(Item.FilterByIsIncluded); + using var reapplyFilter = new Subject(); + + + // UUT Initialization & Action + if (completionStrategy is CompletionStrategy.Immediate) + source.Complete(); + + using var subscription = source.Connect() + .Filter( + predicateChanged: predicateChanged, + reapplyFilter: reapplyFilter) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (completionStrategy is CompletionStrategy.Asynchronous) + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial changeset was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("the collection could still change due to new predicates"); + + + // UUT Action (second completion) + if (lastCompletion is DynamicParameter.PredicateChanged) + reapplyFilter.OnCompleted(); + else + predicateChanged.OnCompleted(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "no changes should have been made"); + results.HasCompleted.Should().BeFalse("the collection could still change due to outstanding source streams"); + + + // UUT Action (last completion) + if (lastCompletion is DynamicParameter.PredicateChanged) + predicateChanged.OnCompleted(); + else + reapplyFilter.OnCompleted(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "no changes should have been made"); + results.HasCompleted.Should().BeTrue("all source streams have completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Fact] + public void SubscriptionIsDisposed_SubscriptionDisposalPropagates() + { + // Setup + using var source = new Subject>(); + using var predicateChanged = new BehaviorSubject>(Item.FilterByIsIncluded); + using var reapplyFilter = new Subject(); + + + // UUT Initialization + using var subscription = source + .Filter( + predicateChanged: predicateChanged, + reapplyFilter: reapplyFilter) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + subscription.Dispose(); + + source .HasObservers.Should().BeFalse("subscription disposal should propagate to all sources"); + predicateChanged.HasObservers.Should().BeFalse("subscription disposal should propagate to all sources"); + reapplyFilter .HasObservers.Should().BeFalse("subscription disposal should propagate to all sources"); + } + + protected override IObservable> BuildUut( + IObservable> source, + Func predicate, + bool suppressEmptyChangeSets) + => source.Filter( + predicateChanged: Observable.Return(predicate), + reapplyFilter: Observable.Never(), + suppressEmptyChangeSets: suppressEmptyChangeSets); + } + } +} diff --git a/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicateState.IntegrationTests.cs b/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicateState.IntegrationTests.cs new file mode 100644 index 000000000..49654ba16 --- /dev/null +++ b/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicateState.IntegrationTests.cs @@ -0,0 +1,76 @@ +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading.Tasks; + +using Bogus; +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.Cache; + +public static partial class FilterFixture +{ + public static partial class DynamicPredicateState + { + public sealed class IntegrationTests + : IntegrationTestFixtureBase + { + [Fact] + public async Task NotificationsOccurOnDifferentThreads_OperatorIsThreadSafe() + { + // Setup + var randomizer = new Randomizer(0x1234567); + + (var items, var changeSets) = GenerateStressItemsAndChangeSets( + editCount: 5_000, + maxChangeCount: 20, + randomizer: randomizer); + + var predicateStates = GenerateRandomIdInclusionMasks( + valueCount: 5_000, + randomizer: randomizer) + .ToArray(); + + using var source = new Subject>(); + using var predicateState = new Subject(); + + + // UUT Initialization + using var subscription = source + .Filter( + predicate: Item.FilterByIdInclusionMask, + predicateState: predicateState) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + + // UUT Action + await Task.WhenAll( + Task.Run(() => + { + foreach (var changeSet in changeSets) + source.OnNext(changeSet); + }), + Task.Run(() => + { + foreach (var value in predicateStates) + predicateState.OnNext(value); + })); + + var finalPredicateState = predicateStates[^1]; + + results.Error.Should().BeNull(); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(items.Items.Where(item => Item.FilterByIdInclusionMask(finalPredicateState, item)), "the source colleciton should be filtered to include only items matching the final predicate"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + } + } +} diff --git a/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicateState.UnitTests.cs b/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicateState.UnitTests.cs new file mode 100644 index 000000000..1b0c55c88 --- /dev/null +++ b/src/DynamicData.Tests/Cache/FilterFixture.DynamicPredicateState.UnitTests.cs @@ -0,0 +1,424 @@ +using System; +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; + +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.Cache; + +public static partial class FilterFixture +{ + public static partial class DynamicPredicateState + { + public sealed class UnitTests + : Base + { + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void ChangesAreMadeBeforeInitialPredicateStateValue_ItemsAreExcluded(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + + // UUT Initialization + using var subscription = source + .Connect() + .Filter( + predicate: static (_, item) => item.IsIncluded, + predicateState: Observable.Never(), + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + // Add changes + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = false } + }); + + // Refresh changes, with no item mutations. + source.Refresh(); + + // Refresh changes, with item mutations affecting filtering. + foreach (var item in source.Items) + item.IsIncluded = !item.IsIncluded; + source.Refresh(); + + // Remove changes + source.RemoveKeys(new[] { 2, 3 }); + + // Update changes, not affecting filtering + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = false }, + new Item() { Id = 4, IsIncluded = true } + }); + + // Update changes, affecting filtering + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false } + }); + + if (emptyChangesetPolicy is EmptyChangesetPolicy.IncludeEmptyChangesets) + results.RecordedChangeSets.Count.Should().Be(6, "6 source operations were performed"); + else + results.RecordedChangeSets.Should().BeEmpty("empty changesets should be suppressed"); + results.RecordedItemsByKey.Should().BeEmpty("the predicate has not initialized"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Fact] + public void PredicateIsNull_ThrowsException() + => FluentActions.Invoking(() => ObservableCacheEx.Filter( + source: Observable.Return(ChangeSet.Empty), + predicate: null!, + predicateState: Observable.Empty())) + .Should() + .Throw(); + + [Theory] + [InlineData(CompletionStrategy.Asynchronous)] + [InlineData(CompletionStrategy.Immediate)] + public void PredicateStateCompletesAfterInitialValue_CompletionWaitsForSourceCompletion(CompletionStrategy completionStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + var predicateState = (completionStrategy is CompletionStrategy.Asynchronous) + ? new Subject() + : Observable.Return(new object()); + + + // UUT Initialization & Action + using var subscription = source.Connect() + .Filter( + predicate: static (_, item) => item.IsIncluded, + predicateState: predicateState) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + if (predicateState is Subject subject) + { + subject.OnNext(new()); + subject.OnCompleted(); + } + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial predicate, was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("changes could still be generated by the source"); + + + // UUT Action + source.Complete(); + + results.Error.Should().BeNull(); + if (completionStrategy is CompletionStrategy.Asynchronous) + results.RecordedChangeSets.Skip(2).Should().BeEmpty("no source operations were performed"); + else + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeTrue("all source streams have completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(CompletionStrategy.Immediate, EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(CompletionStrategy.Immediate, EmptyChangesetPolicy.SuppressEmptyChangesets)] + [InlineData(CompletionStrategy.Asynchronous, EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(CompletionStrategy.Asynchronous, EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void PredicateStateCompletesBeforeInitialValue_CompletionPropagatesIfEmptyChangesetsAreSuppressed( + CompletionStrategy completionStrategy, + EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var predicateState = (completionStrategy is CompletionStrategy.Asynchronous) + ? new Subject() + : Observable.Empty(); + + + // UUT Initialization & Action + using var subscription = source.Connect() + .Filter( + predicate: static (_, item) => item.IsIncluded, + predicateState: predicateState, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + if (predicateState is Subject subject) + subject.OnCompleted(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + if (emptyChangesetPolicy is EmptyChangesetPolicy.IncludeEmptyChangesets) + results.HasCompleted.Should().BeFalse("additional empty changesets can occur"); + else + results.HasCompleted.Should().BeTrue("only empty changesets can occur"); + } + + [Theory] + [InlineData(CompletionStrategy.Asynchronous)] + [InlineData(CompletionStrategy.Immediate)] + public void PredicateStateFails_ErrorPropagates(CompletionStrategy completionStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + var error = new Exception("Test"); + + var predicateState = (completionStrategy is CompletionStrategy.Asynchronous) + ? new Subject() + : Observable.Throw(error); + + + // UUT Initialization & Action + using var subscription = source.Connect() + .Filter( + predicate: static (_, item) => item.IsIncluded, + predicateState: predicateState) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (predicateState is Subject subject) + subject.OnError(error); + + results.Error.Should().Be(error, "errors should propagate downstream"); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + } + + [Fact] + public void PredicateStateIsNull_ThrowsException() + => FluentActions.Invoking(() => ObservableCacheEx.Filter( + source: Observable.Return(ChangeSet.Empty), + predicate: static (object _, Item item) => item.IsIncluded, + predicateState: null!)) + .Should() + .Throw(); + + [Theory] + [InlineData(EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void PredicateStateChanges_ItemsAreReFiltered(EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + using var predicateState = new BehaviorSubject(0x5); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + + // UUT Initialization + using var subscription = source.Connect() + .Filter( + predicate: Item.FilterByIdInclusionMask, + predicateState: predicateState, + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial changeset was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(item => Item.FilterByIdInclusionMask(predicateState.Value, item)), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + predicateState.OnNext(0xA); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 predicate change occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(item => Item.FilterByIdInclusionMask(predicateState.Value, item)), "newly-matching items should have been added, and newly-excluded items should have been removed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(CompletionStrategy.Asynchronous, EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(CompletionStrategy.Asynchronous, EmptyChangesetPolicy.SuppressEmptyChangesets)] + [InlineData(CompletionStrategy.Immediate, EmptyChangesetPolicy.IncludeEmptyChangesets)] + [InlineData(CompletionStrategy.Immediate, EmptyChangesetPolicy.SuppressEmptyChangesets)] + public void SourceCompletesWhenEmpty_CompletionPropagatesWhenEmptyChangesetsAreSuppressed( + CompletionStrategy completionStrategy, + EmptyChangesetPolicy emptyChangesetPolicy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + + // UUT Initialization & Action + if (completionStrategy is CompletionStrategy.Immediate) + source.Complete(); + + using var subscription = source.Connect() + .Filter( + predicate: static (_, item) => item.IsIncluded, + predicateState: Observable.Concat( + Observable.Return(new object()), + Observable.Never()), + suppressEmptyChangeSets: emptyChangesetPolicy is EmptyChangesetPolicy.SuppressEmptyChangesets) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (completionStrategy is CompletionStrategy.Asynchronous) + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + if (emptyChangesetPolicy is EmptyChangesetPolicy.IncludeEmptyChangesets) + results.HasCompleted.Should().BeFalse("the source has completed, but further empty changesets can occur"); + else + results.HasCompleted.Should().BeTrue("the source has completed, and no further changesets can occur"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Theory] + [InlineData(CompletionStrategy.Asynchronous)] + [InlineData(CompletionStrategy.Immediate)] + public void SourceCompletesWhenNotEmpty_CompletionWaitsForPredicateChangedCompletion(CompletionStrategy completionStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + source.AddOrUpdate(new[] + { + new Item() { Id = 1, IsIncluded = true }, + new Item() { Id = 2, IsIncluded = true }, + new Item() { Id = 3, IsIncluded = true }, + new Item() { Id = 4, IsIncluded = false }, + new Item() { Id = 5, IsIncluded = false }, + new Item() { Id = 6, IsIncluded = false } + }); + + using var predicateState = new BehaviorSubject(new object()); + + + // UUT Initialization & Action + if (completionStrategy is CompletionStrategy.Immediate) + source.Complete(); + + using var subscription = source.Connect() + .Filter( + predicate: static (_, item) => item.IsIncluded, + predicateState: predicateState) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (completionStrategy is CompletionStrategy.Asynchronous) + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "an initial changeset was published"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "all matching items should have propagated"); + results.HasCompleted.Should().BeFalse("the collection could still change due to new predicates"); + + + // UUT Action + predicateState.OnCompleted(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items.Where(Item.FilterByIsIncluded), "no changes should have been made"); + results.HasCompleted.Should().BeTrue("all source streams have completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Fact] + public void SubscriptionIsDisposed_SubscriptionDisposalPropagates() + { + // Setup + using var source = new Subject>(); + using var predicateState = new BehaviorSubject(new()); + + + // UUT Initialization + using var subscription = source + .Filter( + predicate: static (_, item) => item.IsIncluded, + predicateState: predicateState) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + subscription.Dispose(); + + source .HasObservers.Should().BeFalse("subscription disposal should propagate to all sources"); + predicateState .HasObservers.Should().BeFalse("subscription disposal should propagate to all sources"); + } + + protected override IObservable> BuildUut( + IObservable> source, + Func predicate, + bool suppressEmptyChangeSets) + => source.Filter( + predicate: (_, item) => predicate.Invoke(item), + predicateState: Observable.Return(new object()), + suppressEmptyChangeSets: suppressEmptyChangeSets); + } + } +} diff --git a/src/DynamicData.Tests/Cache/FilterFixture.Static.cs b/src/DynamicData.Tests/Cache/FilterFixture.Static.cs new file mode 100644 index 000000000..f3d61ac0a --- /dev/null +++ b/src/DynamicData.Tests/Cache/FilterFixture.Static.cs @@ -0,0 +1,90 @@ +using System; +using System.Reactive.Linq; +using System.Reactive.Subjects; + +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.Cache; + +public static partial class FilterFixture +{ + public sealed class Static + : Base + { + [Fact] + public void FilterIsNull_ThrowsException() + => FluentActions.Invoking(static () => ObservableCacheEx.Filter( + source: Observable.Empty>(), + filter: null!)) + .Should() + .Throw(); + + [Theory] + [InlineData(CompletionStrategy.Asynchronous)] + [InlineData(CompletionStrategy.Immediate)] + public void SourceCompletes_CompletionPropagates(CompletionStrategy completionStrategy) + { + // Setup + using var source = new TestSourceCache(Item.SelectId); + + + // UUT Initialization & Action + if (completionStrategy is CompletionStrategy.Immediate) + source.Complete(); + + using var subscription = source.Connect(suppressEmptyChangeSets: false) + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + if (completionStrategy is CompletionStrategy.Asynchronous) + source.Complete(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + results.HasCompleted.Should().BeTrue("the source has completed"); + + + // Final verification + results.ShouldNotSupportSorting("sorting is not supported by filter operators"); + } + + [Fact] + public void SubscriptionIsDisposed_SubscriptionDisposalPropagates() + { + // Setup + using var source = new Subject>(); + + + // UUT Intialization + using var subscription = source + .Filter(Item.FilterByIsIncluded) + .ValidateSynchronization() + .ValidateChangeSets(Item.SelectId) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); + results.RecordedItemsByKey.Values.Should().BeEmpty("the source has not initialized"); + results.HasCompleted.Should().BeFalse("the source has not completed"); + + + // UUT Action + subscription.Dispose(); + + source.HasObservers.Should().BeFalse("subscription disposal should propagate to all sources"); + } + + protected override IObservable> BuildUut( + IObservable> source, + Func predicate, + bool suppressEmptyChangeSets) + => source.Filter( + filter: predicate, + suppressEmptyChangeSets: suppressEmptyChangeSets); + } +} diff --git a/src/DynamicData.Tests/Cache/FilterFixture.WithPredicateState.cs b/src/DynamicData.Tests/Cache/FilterFixture.WithPredicateState.cs deleted file mode 100644 index 2f3abf906..000000000 --- a/src/DynamicData.Tests/Cache/FilterFixture.WithPredicateState.cs +++ /dev/null @@ -1,805 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reactive.Linq; -using System.Reactive.Subjects; -using System.Threading; -using System.Threading.Tasks; - -using Bogus; -using FluentAssertions; -using Xunit; - -using DynamicData.Tests.Utilities; - -namespace DynamicData.Tests.Cache; - -public partial class FilterFixture -{ - public sealed class WithPredicateState - { - [Fact] - public void ChangesAreMadeAfterInitialPredicateState_ItemsAreFiltered() - { - using var source = new SourceCache(static item => item.Id); - using var predicateState = new Subject(); - - using var subscription = source - .Connect() - .Filter( - predicateState: predicateState, - predicate: static (predicateState, item) => item.IsIncluded) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - - // Set initial state - predicateState.OnNext(new()); - - results.RecordedChangeSets.Should().BeEmpty("no source operations have been performed"); - - - // Test Add changes - source.AddOrUpdate(new[] - { - new Item() { Id = 1, IsIncluded = true }, - new Item() { Id = 2, IsIncluded = true }, - new Item() { Id = 3, IsIncluded = false }, - new Item() { Id = 4, IsIncluded = false } - }); - - results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); - ShouldBeValid(results, EnumerateFilteredItems()); - - - // Test Refresh changes, with no item mutations. - source.Refresh(); - - results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed"); - results.RecordedChangeSets.Skip(1).First().Select(static change => change.Reason).Should().AllBeEquivalentTo(ChangeReason.Refresh, "all included items should have been refreshed"); - results.RecordedChangeSets.Skip(1).First().Select(static change => change.Current).Should().BeEquivalentTo(EnumerateFilteredItems(), "all included items should have been refreshed"); - ShouldBeValid(results, EnumerateFilteredItems()); - - - // Test Refresh changes, with item mutations affecting filtering. - foreach (var item in source.Items) - item.IsIncluded = !item.IsIncluded; - source.Refresh(); - - results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "one source operation was performed"); - ShouldBeValid(results, EnumerateFilteredItems()); - - - // Test Remove changes - source.RemoveKeys(new[] { 2, 3 }); - - results.RecordedChangeSets.Skip(3).Count().Should().Be(1, "one source operation was performed"); - ShouldBeValid(results, EnumerateFilteredItems()); - - - // Test Update changes, not affecting filtering - source.AddOrUpdate(new[] - { - new Item() { Id = 1, IsIncluded = false }, - new Item() { Id = 4, IsIncluded = true } - }); - - results.RecordedChangeSets.Skip(4).Count().Should().Be(1, "one source operation was performed"); - ShouldBeValid(results, EnumerateFilteredItems()); - - - // Test Update changes, affecting filtering - source.AddOrUpdate(new[] - { - new Item() { Id = 1, IsIncluded = true }, - new Item() { Id = 4, IsIncluded = false } - }); - - results.RecordedChangeSets.Skip(5).Count().Should().Be(1, "one source operation was performed"); - ShouldBeValid(results, EnumerateFilteredItems()); - - - IEnumerable EnumerateFilteredItems() - => source.Items.Where(static item => item.IsIncluded); - } - - [Fact] - public void ChangesAreMadeAfterMultiplePredicateStateChanges_ItemsAreFilteredWithLatestPredicateState() - { - using var source = new SourceCache(static item => item.Id); - using var predicateState = new BehaviorSubject(1); - - using var subscription = source - .Connect() - .Filter( - predicateState: predicateState, - predicate: static (predicateState, item) => item.Id == predicateState) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - - // Publish multiple state changes - predicateState.OnNext(2); - predicateState.OnNext(3); - - results.RecordedChangeSets.Should().BeEmpty("no source operations have been performed"); - - - // Test filtering of items, by state - source.AddOrUpdate(new[] - { - new Item() { Id = 1, IsIncluded = true }, - new Item() { Id = 2, IsIncluded = true }, - new Item() { Id = 3, IsIncluded = false }, - new Item() { Id = 4, IsIncluded = false } - }); - - results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); - ShouldBeValid(results, source.Items.Where(item => item.Id == predicateState.Value)); - } - - [Fact] - public void ChangesAreMadeBeforeInitialPredicateState_ItemsAreFilteredOnPredicateState() - { - using var source = new SourceCache(static item => item.Id); - using var predicateState = new Subject(); - - using var subscription = source - .Connect() - .Filter( - predicateState: predicateState, - predicate: static (predicateState, item) => item.IsIncluded) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - - results.RecordedChangeSets.Should().BeEmpty("no source operations have been performed"); - - - // Test Add changes - source.AddOrUpdate(new[] - { - new Item() { Id = 1, IsIncluded = true }, - new Item() { Id = 2, IsIncluded = true }, - new Item() { Id = 3, IsIncluded = false }, - new Item() { Id = 4, IsIncluded = false } - }); - - // Test Refresh changes, with no item mutations. - source.Refresh(); - - // Test Refresh changes, with item mutations affecting filtering. - foreach (var item in source.Items) - item.IsIncluded = !item.IsIncluded; - source.Refresh(); - - // Test Remove changes - source.RemoveKeys(new[] { 2, 3 }); - - // Test Update changes, not affecting filtering - source.AddOrUpdate(new[] - { - new Item() { Id = 1, IsIncluded = false }, - new Item() { Id = 4, IsIncluded = true } - }); - - // Test Update changes, affecting filtering - source.AddOrUpdate(new[] - { - new Item() { Id = 1, IsIncluded = true }, - new Item() { Id = 4, IsIncluded = false } - }); - - results.RecordedChangeSets.Should().BeEmpty("the predicate state has not initialized"); - results.RecordedItemsByKey.Should().BeEmpty("the predicate state has not initialized"); - - - // Set initial state - predicateState.OnNext(new()); - - results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); - ShouldBeValid(results, source.Items.Where(static item => item.IsIncluded)); - } - - [Fact] - public void ItemsAreMoved_ChangesAreIgnored() - { - using var source = new Subject>(); - using var predicateState = new Subject(); - - using var subscription = source - .Filter( - predicateState: predicateState, - predicate: static (predicateState, item) => item.IsIncluded) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - var items = new[] - { - new Item() { Id = 1, IsIncluded = true }, - new Item() { Id = 2, IsIncluded = true }, - new Item() { Id = 3, IsIncluded = false }, - new Item() { Id = 4, IsIncluded = false } - }; - - - // Set initial state - predicateState.OnNext(new()); - source.OnNext(new ChangeSet(items - .Select((item, index) => new Change( - reason: ChangeReason.Add, - key: item.Id, - current: item, - index: index)))); - - results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); - ShouldBeValid(results, EnumerateFilteredItems()); - - - // Test Moved changes, for both included and excluded items. - source.OnNext(new ChangeSet() - { - new(reason: ChangeReason.Moved, key: 1, current: items[0], previous: default, previousIndex: 0, currentIndex: 1), - new(reason: ChangeReason.Moved, key: 2, current: items[1], previous: default, previousIndex: 0, currentIndex: 2), - new(reason: ChangeReason.Moved, key: 3, current: items[2], previous: default, previousIndex: 1, currentIndex: 0) - }); - - results.RecordedChangeSets.Skip(1).Should().BeEmpty("the move operation should have been ignored"); - ShouldBeValid(results, EnumerateFilteredItems()); - - - IEnumerable EnumerateFilteredItems() - => items.Where(static item => item.IsIncluded); - } - - [Fact] - public void PredicateIsNull_ExceptionIsThrown() - => FluentActions.Invoking(() => Observable.Empty>() - .Filter( - predicateState: Observable.Empty(), - predicate: null!)) - .Should() - .Throw(); - - [Fact] - public void PredicateStateChanges_ItemsAreRefiltered() - { - using var source = new SourceCache(static item => item.Id); - using var predicateState = new BehaviorSubject(1); - - using var subscription = source - .Connect() - .Filter( - predicateState: predicateState, - predicate: static (predicateState, item) => item.Id == predicateState) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - - // Test filtering of items, by state - source.AddOrUpdate(new[] - { - new Item() { Id = 1, IsIncluded = true }, - new Item() { Id = 2, IsIncluded = true }, - new Item() { Id = 3, IsIncluded = false }, - new Item() { Id = 4, IsIncluded = false } - }); - - results.RecordedChangeSets.Count.Should().Be(1, "one source operation was performed"); - ShouldBeValid(results, EnumerateFilteredItems()); - - - // Publish a state change, to change the filtering - predicateState.OnNext(2); - - results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "one source operation was performed"); - ShouldBeValid(results, EnumerateFilteredItems()); - - - IEnumerable EnumerateFilteredItems() - => source.Items.Where(item => item.Id == predicateState.Value); - } - - [Fact] - public void PredicateStateCompletesAfterInitialValue_CompletionWaitsForSourceCompletion() - { - using var source = new Subject>(); - - using var subscription = source - .Filter( - predicateState: Observable.Return(new object()), - predicate: static (predicateState, item) => item.IsIncluded) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - results.HasCompleted.Should().BeFalse("changes could still be generated by the source"); - results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); - - source.OnCompleted(); - - results.HasCompleted.Should().BeTrue("all input streams have completed"); - results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); - } - - [Fact] - public void PredicateStateCompletesImmediately_CompletionIsPropagated() - { - using var source = new Subject>(); - - using var subscription = source - .Filter( - predicateState: Observable.Empty(), - predicate: static (predicateState, item) => item.IsIncluded) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - results.HasCompleted.Should().BeTrue("completion of the predicate state stream before it emits any values means that items can never be accepted by the filter predicate"); - results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); - - source.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); - } - - [Fact] - public void PredicateStateErrors_ErrorIsPropagated() - { - using var source = new Subject>(); - using var predicateState = new Subject(); - - using var subscription = source - .Filter( - predicateState: predicateState, - predicate: static (predicateState, item) => item.IsIncluded) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - - var error = new Exception("This is a test."); - predicateState.OnError(error); - - results.Error.Should().Be(error, "errors should be propagated"); - results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); - - source.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); - predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); - } - - [Fact] - public void PredicateStateErrorsImmediately_ErrorIsPropagated() - { - using var source = new Subject>(); - - var error = new Exception("This is a test."); - - using var subscription = source - .Filter( - predicateState: Observable.Throw(error), - predicate: static (predicateState, item) => item.IsIncluded) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - results.Error.Should().Be(error, "errors should be propagated"); - results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); - - source.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); - } - - [Fact] - public void PredicateStateIsNull_ExceptionIsThrown() - => FluentActions.Invoking(() => Observable.Empty>() - .Filter( - predicateState: (null as IObservable)!, - predicate: static (_, _) => true)) - .Should() - .Throw(); - - [Fact] - public async Task SourceAndPredicateStateNotifyFromDifferentThreads_FilteringIsThreadSafe() - { - var randomizer = new Randomizer(0x1234567); - - (var items, var changeSets) = GenerateStressItemsAndChangeSets( - editCount: 5_000, - maxChangeCount: 20, - randomizer: randomizer); - - var predicateStates = GenerateRandomPredicateStates( - valueCount: 5_000, - randomizer: randomizer); - - - using var source = new Subject>(); - - using var predicateState = new Subject(); - - using var subscription = source - .Filter( - predicateState: predicateState, - predicate: Item.FilterByIdInclusionMask) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - using var timeoutSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - - await Task.WhenAll( - Task.Run( - action: () => - { - foreach (var changeSet in changeSets) - source.OnNext(changeSet); - }, - cancellationToken: timeoutSource.Token), - Task.Run( - action: () => - { - foreach (var value in predicateStates) - predicateState.OnNext(value); - }, - cancellationToken: timeoutSource.Token)); - - var finalPredicateState = predicateStates[^1]; - ShouldBeValid(results, items.Items.Where(item => Item.FilterByIdInclusionMask(finalPredicateState, item))); - } - - [Fact] - public void SourceCompletesWhenEmpty_CompletionIsPropagated() - { - using var source = new Subject>(); - - using var predicateState = new Subject(); - - using var subscription = source - .Filter( - predicateState: predicateState, - predicate: static (predicateState, item) => item.IsIncluded) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - source.OnCompleted(); - - results.HasCompleted.Should().BeTrue("no further changes can occur when there are no items to be filtered or unfiltered"); - results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); - - predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); - } - - [Fact] - public void SourceCompletesWhenNotEmpty_CompletionWaitsForStateCompletion() - { - using var source = new Subject>(); - - using var predicateState = new Subject(); - - using var subscription = source - .Filter( - predicateState: predicateState, - predicate: static (predicateState, item) => item.IsIncluded) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - source.OnNext(new ChangeSet() { new(reason: ChangeReason.Add, key: 1, current: new Item() { Id = 1, IsIncluded = true }) }); - source.OnCompleted(); - - results.HasCompleted.Should().BeFalse("changes could still be generated by changes in predicate state"); - results.RecordedChangeSets.Should().BeEmpty("the predicate has not initialized"); - - predicateState.OnCompleted(); - - results.HasCompleted.Should().BeTrue("all input streams have completed"); - results.RecordedChangeSets.Should().BeEmpty("the predicate never initialized"); - } - - [Fact] - public void SourceCompletesImmediately_CompletionIsPropagated() - { - using var predicateState = new Subject(); - - using var subscription = Observable.Empty>() - .Filter( - predicateState: predicateState, - predicate: static (predicateState, item) => item.IsIncluded) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - results.HasCompleted.Should().BeTrue("no further changes can occur when there are no items to be filtered or unfiltered"); - results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); - - predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); - } - - [Fact] - public void SourceErrors_ErrorIsPropagated() - { - using var source = new Subject>(); - - using var predicateState = new Subject(); - - using var subscription = source - .Filter( - predicateState: predicateState, - predicate: static (predicateState, item) => item.IsIncluded) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - - var error = new Exception("This is a test."); - source.OnError(error); - - results.Error.Should().Be(error, "errors should be propagated"); - results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); - - source.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); - predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); - } - - [Fact] - public void SourceErrorsImmediately_ErrorIsPropagated() - { - using var predicateState = new Subject(); - - var error = new Exception("This is a test."); - - using var subscription = Observable.Throw>(error) - .Filter( - predicateState: predicateState, - predicate: static (predicateState, item) => item.IsIncluded) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - results.Error.Should().Be(error, "errors should be propagated"); - results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); - - predicateState.HasObservers.Should().BeFalse("all subscriptions should have been disposed, during finalization of the stream"); - } - - [Fact] - public void SourceIsNull_ExceptionIsThrown() - => FluentActions.Invoking(() => ObservableCacheEx.Filter( - source: (null as IObservable>)!, - predicateState: Observable.Empty(), - predicate: static (_, _) => true)) - .Should() - .Throw(); - - [Fact] - public void SubscriptionIsDisposed_UnsubscriptionIsPropagated() - { - using var source = new Subject>(); - - using var predicateState = new Subject(); - - using var subscription = source - .Filter( - predicateState: predicateState, - predicate: static (predicateState, item) => item.IsIncluded) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - - subscription.Dispose(); - - source.HasObservers.Should().BeFalse("subscription disposal should be propagated to all input streams"); - predicateState.HasObservers.Should().BeFalse("subscription disposal should be propagated to all input streams"); - - results.RecordedChangeSets.Should().BeEmpty("no source operations were performed"); - } - - [Theory] - [InlineData("source", "predicateState")] - [InlineData("predicateState", "source")] - public void SuppressEmptyChangeSetsIsFalse_EmptyChangesetsArePropagatedAndOnlyFinalCompletionIsPropagated(params string[] completionOrder) - { - using var source = new Subject>(); - - using var predicateState = new Subject(); - - using var subscription = source - .Filter( - predicateState: predicateState, - predicate: static (predicateState, item) => item.IsIncluded, - suppressEmptyChangeSets: false) - .ValidateSynchronization() - .ValidateChangeSets(static item => item.Id) - .RecordCacheItems(out var results); - - - // Initialize the predicate - predicateState.OnNext(new object()); - - results.RecordedChangeSets.Count.Should().Be(1, "the predicate state was initialized"); - results.RecordedChangeSets[0].Should().BeEmpty("there are no items in the collection"); - ShouldBeValid(results, Enumerable.Empty()); - - - // Publish an empty changeset - source.OnNext(ChangeSet.Empty); - - results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "a source operation was performed"); - results.RecordedChangeSets.Skip(1).First().Should().BeEmpty("the source changeset was empty"); - ShouldBeValid(results, Enumerable.Empty()); - - - // Publish a changeset with only excluded items - source.OnNext(new ChangeSet() - { - new(reason: ChangeReason.Add, key: 1, current: new Item() { Id = 1, IsIncluded = false }), - new(reason: ChangeReason.Add, key: 2, current: new Item() { Id = 2, IsIncluded = false }), - new(reason: ChangeReason.Add, key: 3, current: new Item() { Id = 3, IsIncluded = false }) - }); - - results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "a source operation was performed"); - results.RecordedChangeSets.Skip(2).First().Should().BeEmpty("all source items were excluded"); - ShouldBeValid(results, Enumerable.Empty()); - - for (var i = 0; i < completionOrder.Length; ++i) - { - switch (completionOrder[i]) - { - case nameof(source): - source.OnCompleted(); - break; - - case nameof(predicateState): - predicateState.OnCompleted(); - break; - } - - if (i < (completionOrder.Length - 1)) - results.HasCompleted.Should().BeFalse("not all input streams have completed"); - } - - results.HasCompleted.Should().BeTrue("all input streams have completed"); - } - - private static void ShouldBeValid( - CacheItemRecordingObserver results, - IEnumerable expectedFilteredItems) - { - results.Error.Should().BeNull("no errors should have occurred"); - results.HasCompleted.Should().BeFalse("no completion events should have occurred"); - results.RecordedChangeSets.Should().AllSatisfy(changeSet => - { - if (changeSet.Count is not 0) - changeSet.Should().AllSatisfy(change => - { - change.CurrentIndex.Should().Be(-1, "sorting indexes should not be propagated"); - change.PreviousIndex.Should().Be(-1, "sorting indexes should not be propagated"); - }); - }); - results.RecordedItemsByKey.Values.Should().BeEquivalentTo(expectedFilteredItems, "all filtered items should match the filter predicate"); - results.RecordedItemsSorted.Should().BeEmpty("sorting is not supported by filter opreators"); - } - - private static (ICache items, IReadOnlyList> changeSets) GenerateStressItemsAndChangeSets( - int editCount, - int maxChangeCount, - Randomizer randomizer) - { - // Not exercising Moved, since ChangeAwareCache<> doesn't support it, and I'm too lazy to implement it by hand. - var changeReasons = new[] - { - ChangeReason.Add, - ChangeReason.Refresh, - ChangeReason.Remove, - ChangeReason.Update - }; - - // Weights are chosen to make the cache size likely to grow over time, - // exerting more pressure on the system the longer the benchmark runs. - // Also, to prevent bogus operations (E.G. you can't remove an item from an empty cache). - var changeReasonWeightsWhenCountIs0 = new[] - { - 1f, // Add - 0f, // Refresh - 0f, // Remove - 0f // Update - }; - - var changeReasonWeightsOtherwise = new[] - { - 0.30f, // Add - 0.25f, // Refresh - 0.20f, // Remove - 0.25f // Update - }; - - var nextItemId = 1; - - var changeSets = new List>(capacity: editCount); - - var items = new ChangeAwareCache(); - - while (changeSets.Count < changeSets.Capacity) - { - var changeCount = randomizer.Int(1, maxChangeCount); - for (var i = 0; i < changeCount; ++i) - { - var changeReason = randomizer.WeightedRandom(changeReasons, items.Count switch - { - 0 => changeReasonWeightsWhenCountIs0, - _ => changeReasonWeightsOtherwise - }); - - switch (changeReason) - { - case ChangeReason.Add: - items.AddOrUpdate( - item: new Item() - { - Id = nextItemId, - IsIncluded = randomizer.Bool() - }, - key: nextItemId); - ++nextItemId; - break; - - case ChangeReason.Refresh: - items.Refresh(items.Keys.ElementAt(randomizer.Int(0, items.Count - 1))); - break; - - case ChangeReason.Remove: - items.Remove(items.Keys.ElementAt(randomizer.Int(0, items.Count - 1))); - break; - - case ChangeReason.Update: - var id = items.Keys.ElementAt(randomizer.Int(0, items.Count - 1)); - items.AddOrUpdate( - item: new Item() - { - Id = id, - IsIncluded = randomizer.Bool() - }, - key: id); - break; - } - } - - changeSets.Add(items.CaptureChanges()); - } - - return (items, changeSets); - } - - private static IReadOnlyList GenerateRandomPredicateStates( - int valueCount, - Randomizer randomizer) - { - var values = new List(capacity: valueCount); - - while (values.Count < valueCount) - values.Add(randomizer.Int()); - - return values; - } - - private class Item - { - public static bool FilterByIdInclusionMask( - int idInclusionMask, - Item item) - => ((item.Id & idInclusionMask) == 0) && item.IsIncluded; - - public required int Id { get; init; } - - public bool IsIncluded { get; set; } - - public override string ToString() - => $"{{ Id = {Id}, IsIncluded = {IsIncluded} }}"; - } - } -} diff --git a/src/DynamicData.Tests/Cache/FilterFixture.cs b/src/DynamicData.Tests/Cache/FilterFixture.cs index 1b3fc3ca4..d42e11579 100644 --- a/src/DynamicData.Tests/Cache/FilterFixture.cs +++ b/src/DynamicData.Tests/Cache/FilterFixture.cs @@ -1,231 +1,152 @@ -using System; +using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; -using DynamicData.Tests.Domain; - -using FluentAssertions; - -using Xunit; +using Bogus; namespace DynamicData.Tests.Cache; -public partial class FilterFixture : IDisposable +public static partial class FilterFixture { - private readonly ChangeSetAggregator _results; - - private readonly ISourceCache _source; - - public FilterFixture() - { - _source = new SourceCache(p => p.Name); - _results = _source.Connect(p => p.Age > 20).AsAggregator(); - } - - [Fact] - public void AddMatched() - { - var person = new Person("Adult1", 50); - _source.AddOrUpdate(person); - - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Data.Count.Should().Be(1, "Should be 1 item in the cache"); - _results.Data.Items[0].Should().Be(person, "Should be same person"); - } - - [Fact] - public void AddNotMatched() + public enum CompletionStrategy { - var person = new Person("Adult1", 10); - _source.AddOrUpdate(person); - - _results.Messages.Count.Should().Be(0, "Should have no item updates"); - _results.Data.Count.Should().Be(0, "Cache should have no items"); + Immediate, + Asynchronous } - [Fact] - public void AddNotMatchedAndUpdateMatched() + public enum EmptyChangesetPolicy { - const string key = "Adult1"; - var notmatched = new Person(key, 19); - var matched = new Person(key, 21); - - _source.Edit( - updater => - { - updater.AddOrUpdate(notmatched); - updater.AddOrUpdate(matched); - }); - - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Messages[0].First().Current.Should().Be(matched, "Should be same person"); - _results.Data.Items[0].Should().Be(matched, "Should be same person"); + SuppressEmptyChangesets, + IncludeEmptyChangesets } - [Fact] - public void AttemptedRemovalOfANonExistentKeyWillBeIgnored() + public enum DynamicParameter { - const string key = "Adult1"; - _source.Remove(key); - _results.Messages.Count.Should().Be(0, "Should be 0 updates"); + Source, + PredicateChanged, + ReapplyFilter } - [Fact] - public void BatchOfUniqueUpdates() + public record Item { - var people = Enumerable.Range(1, 100).Select(i => new Person("Name" + i, i)).ToArray(); + public static bool FilterByEvenId(Item item) + => (item.Id % 2) == 0; - _source.AddOrUpdate(people); - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Messages[0].Adds.Should().Be(80, "Should return 80 adds"); + public static bool FilterByIsIncluded(Item item) + => item.IsIncluded; - var filtered = people.Where(p => p.Age > 20).OrderBy(p => p.Age).ToArray(); - var expected = _results.Data.Items.OrderBy(p => p.Age).ToArray(); - expected.Should().BeEquivalentTo(filtered, "Incorrect Filter result"); - } - - [Fact] - public void BatchRemoves() - { - var people = Enumerable.Range(1, 100).Select(l => new Person("Name" + l, l)).ToArray(); + public static bool FilterByIdInclusionMask( + int idInclusionMask, + Item item) + => ((item.Id & idInclusionMask) == 0) && item.IsIncluded; - _source.AddOrUpdate(people); - _source.Remove(people); + public static int SelectId(Item item) + => item.Id; + + public required int Id { get; init; } - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(80, "Should be 80 addes"); - _results.Messages[1].Removes.Should().Be(80, "Should be 80 removes"); - _results.Data.Count.Should().Be(0, "Should be nothing cached"); + public bool IsIncluded { get; set; } } - [Fact] - public void BatchSuccessiveUpdates() + private static (ICache items, IReadOnlyList> changeSets) GenerateStressItemsAndChangeSets( + int editCount, + int maxChangeCount, + Randomizer randomizer) { - var people = Enumerable.Range(1, 100).Select(l => new Person("Name" + l, l)).ToArray(); - foreach (var person in people) + // Not exercising Moved, since ChangeAwareCache<> doesn't support it, and I'm too lazy to implement it by hand. + var changeReasons = new[] { - var person1 = person; - _source.AddOrUpdate(person1); - } - - _results.Messages.Count.Should().Be(80, "Should be 100 updates"); - _results.Data.Count.Should().Be(80, "Should be 100 in the cache"); - var filtered = people.Where(p => p.Age > 20).OrderBy(p => p.Age).ToArray(); - _results.Data.Items.OrderBy(p => p.Age).Should().BeEquivalentTo(filtered, "Incorrect Filter result"); - } - - [Fact] - public void Clear() - { - var people = Enumerable.Range(1, 100).Select(l => new Person("Name" + l, l)).ToArray(); - _source.AddOrUpdate(people); - _source.Clear(); - - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(80, "Should be 80 addes"); - _results.Messages[1].Removes.Should().Be(80, "Should be 80 removes"); - _results.Data.Count.Should().Be(0, "Should be nothing cached"); - } - - - public void Dispose() - { - _source.Dispose(); - _results.Dispose(); - } - - [Fact] - public void DuplicateKeyWithMerge() - { - const string key = "Adult1"; - var newperson = new Person(key, 30); - - using var results = _source.Connect().Merge(_source.Connect()).Filter(p => p.Age > 20).AsAggregator(); - _source.AddOrUpdate(newperson); // previously this would throw an exception - - results.Messages.Count.Should().Be(2, "Should be 2 messages"); - results.Messages[0].Adds.Should().Be(1, "Should be 1 add"); - results.Messages[1].Updates.Should().Be(1, "Should be 1 update"); - results.Data.Count.Should().Be(1, "Should be cached"); - } + ChangeReason.Add, + ChangeReason.Refresh, + ChangeReason.Remove, + ChangeReason.Update + }; + + // Weights are chosen to make the cache size likely to grow over time, + // exerting more pressure on the system the longer the benchmark runs. + // Also, to prevent bogus operations (E.G. you can't remove an item from an empty cache). + var changeReasonWeightsWhenCountIs0 = new[] + { + 1f, // Add + 0f, // Refresh + 0f, // Remove + 0f // Update + }; - [Fact] - public void Remove() - { - const string key = "Adult1"; - var person = new Person(key, 50); + var changeReasonWeightsOtherwise = new[] + { + 0.30f, // Add + 0.25f, // Refresh + 0.20f, // Remove + 0.25f // Update + }; - _source.AddOrUpdate(person); - _source.Remove(person); + var nextItemId = 1; - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(1, "Should be 80 adds"); - _results.Messages[1].Removes.Should().Be(1, "Should be 80 removes"); - _results.Data.Count.Should().Be(0, "Should be nothing cached"); - } + var changeSets = new List>(capacity: editCount); - [Fact] - public void SameKeyChanges() - { - const string key = "Adult1"; + var items = new ChangeAwareCache(); - _source.Edit( - updater => + while (changeSets.Count < changeSets.Capacity) + { + var changeCount = randomizer.Int(1, maxChangeCount); + for (var i = 0; i < changeCount; ++i) { - updater.AddOrUpdate(new Person(key, 50)); - updater.AddOrUpdate(new Person(key, 52)); - updater.AddOrUpdate(new Person(key, 53)); - updater.Remove(key); - }); - - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Messages[0].Adds.Should().Be(1, "Should be 1 adds"); - _results.Messages[0].Updates.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Removes.Should().Be(1, "Should be 1 remove"); - } - - [Fact] - public void UpdateMatched() + var changeReason = randomizer.WeightedRandom(changeReasons, items.Count switch + { + 0 => changeReasonWeightsWhenCountIs0, + _ => changeReasonWeightsOtherwise + }); + + switch (changeReason) + { + case ChangeReason.Add: + items.AddOrUpdate( + item: new Item() + { + Id = nextItemId, + IsIncluded = randomizer.Bool() + }, + key: nextItemId); + ++nextItemId; + break; + + case ChangeReason.Refresh: + items.Refresh(items.Keys.ElementAt(randomizer.Int(0, items.Count - 1))); + break; + + case ChangeReason.Remove: + items.Remove(items.Keys.ElementAt(randomizer.Int(0, items.Count - 1))); + break; + + case ChangeReason.Update: + var id = items.Keys.ElementAt(randomizer.Int(0, items.Count - 1)); + items.AddOrUpdate( + item: new Item() + { + Id = id, + IsIncluded = randomizer.Bool() + }, + key: id); + break; + } + } + + changeSets.Add(items.CaptureChanges()); + } - { - const string key = "Adult1"; - var newperson = new Person(key, 50); - var updated = new Person(key, 51); - _source.AddOrUpdate(newperson); - _source.AddOrUpdate(updated); - - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(1, "Should be 1 adds"); - _results.Messages[1].Updates.Should().Be(1, "Should be 1 update"); + return (items, changeSets); } - [Fact] - public void UpdateNotMatched() + private static IReadOnlyList GenerateRandomIdInclusionMasks( + int valueCount, + Randomizer randomizer) { - const string key = "Adult1"; - var newperson = new Person(key, 10); - var updated = new Person(key, 11); - - _source.AddOrUpdate(newperson); - _source.AddOrUpdate(updated); + var values = new List(capacity: valueCount); - _results.Messages.Count.Should().Be(0, "Should be no updates"); - _results.Data.Count.Should().Be(0, "Should nothing cached"); - } + while (values.Count < valueCount) + values.Add(randomizer.Int()); - [Fact] - public void EmptyChanges() - { - IChangeSet? change = null; - - //need to also apply overload on connect as that will also need to provide and empty notification - // [alternatively _source.Connect(x=> x.Age == 20, suppressEmptyChangeSets: false)] instead - using var subscription = _source.Connect(suppressEmptyChangeSets: false) - .Filter(x=> x.Age == 20, false) - .Subscribe(c => change = c); - - change.Should().NotBeNull(); - change!.Count.Should().Be(0); + return values; } } diff --git a/src/DynamicData.Tests/Utilities/CacheChangeSetAssertions.cs b/src/DynamicData.Tests/Utilities/CacheChangeSetAssertions.cs new file mode 100644 index 000000000..bb4e7cb3f --- /dev/null +++ b/src/DynamicData.Tests/Utilities/CacheChangeSetAssertions.cs @@ -0,0 +1,18 @@ +using System.Collections.Generic; +using System.Linq; + +using FluentAssertions; + +namespace DynamicData.Tests.Utilities; + +public static class CacheChangeSetAssertions +{ + public static void ShouldHaveRefreshed( + this IChangeSet changeSet, + IEnumerable expectedItems, + string because = "") + => changeSet + .Where(static change => change.Reason is ChangeReason.Refresh) + .Select(static change => change.Current) + .Should().BeEquivalentTo(expectedItems, because); +} diff --git a/src/DynamicData.Tests/Utilities/TestSourceCache.cs b/src/DynamicData.Tests/Utilities/TestSourceCache.cs index 30fe327d4..bdd440a1c 100644 --- a/src/DynamicData.Tests/Utilities/TestSourceCache.cs +++ b/src/DynamicData.Tests/Utilities/TestSourceCache.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; @@ -96,14 +97,30 @@ private void AssertCanMutate() } private IObservable WrapStream(IObservable sourceStream) - => Observable - .Merge( - _error - .Select(static error => (error is not null) - ? Observable.Throw(error!) - : Observable.Empty()) - .Switch(), - sourceStream) - .TakeUntil(_hasCompleted - .Where(static hasCompleted => hasCompleted)); + => Observable.Create(downstreamObserver => + { + var whenCompleted = _hasCompleted + .Where(static hasCompleted => hasCompleted) + .Publish(); + + var subscription = Observable + .Merge( + _error + .Select(static error => (error is not null) + ? Observable.Throw(error!) + : Observable.Empty()) + .Switch(), + sourceStream) + .TakeUntil(whenCompleted) + .SubscribeSafe(downstreamObserver); + + // Delayed connection of the completion event ensures that subscribers always receive an initial changeset, if one is emitted. + var connection = whenCompleted.Connect(); + + return Disposable.Create(() => + { + subscription.Dispose(); + connection.Dispose(); + }); + }); } diff --git a/src/DynamicData/Cache/Internal/DynamicFilter.cs b/src/DynamicData/Cache/Internal/DynamicFilter.cs deleted file mode 100644 index f38526081..000000000 --- a/src/DynamicData/Cache/Internal/DynamicFilter.cs +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. -// Roland Pheasant licenses this file to you under the MIT license. -// See the LICENSE file in the project root for full license information. - -using System.Reactive; -using System.Reactive.Disposables; -using System.Reactive.Linq; - -namespace DynamicData.Cache.Internal; - -internal sealed class DynamicFilter(IObservable> source, IObservable> predicateChanged, IObservable? refilterObservable = null, bool suppressEmptyChangeSets = true) - where TObject : notnull - where TKey : notnull -{ - private readonly IObservable> _predicateChanged = predicateChanged ?? throw new ArgumentNullException(nameof(predicateChanged)); - private readonly IObservable> _source = source ?? throw new ArgumentNullException(nameof(source)); - - public IObservable> Run() => Observable.Create>( - observer => - { - var allData = new Cache(); - var filteredData = new ChangeAwareCache(); - Func predicate = _ => false; - - var locker = InternalEx.NewLock(); - - var refresher = LatestPredicateObservable().Synchronize(locker).Select( - p => - { - // set the local predicate - predicate = p; - - // reapply filter using all data from the cache - return filteredData.RefreshFilteredFrom(allData, predicate); - }); - - var dataChanged = _source.Synchronize(locker).Select( - changes => - { - // maintain all data [required to re-apply filter] - allData.Clone(changes); - - // maintain filtered data - filteredData.FilterChanges(changes, predicate); - - // get latest changes - return filteredData.CaptureChanges(); - }); - - var sourceMerged = refresher.Merge(dataChanged); - if (suppressEmptyChangeSets) - { - sourceMerged = sourceMerged.NotEmpty(); - } - - return sourceMerged.SubscribeSafe(observer); - }); - - private IObservable> LatestPredicateObservable() => Observable.Create>( - observable => - { - Func latest = _ => false; - - observable.OnNext(latest); - - var predicateChangedDisposable = _predicateChanged.Subscribe( - predicate => - { - latest = predicate; - observable.OnNext(latest); - }); - - var reapplier = refilterObservable is null ? Disposable.Empty : refilterObservable.Subscribe(_ => observable.OnNext(latest)); - - return new CompositeDisposable(predicateChangedDisposable, reapplier); - }); -} diff --git a/src/DynamicData/Cache/Internal/Filter.WithPredicateState.cs b/src/DynamicData/Cache/Internal/Filter.Dynamic.cs similarity index 80% rename from src/DynamicData/Cache/Internal/Filter.WithPredicateState.cs rename to src/DynamicData/Cache/Internal/Filter.Dynamic.cs index 4c465039a..b450e6976 100644 --- a/src/DynamicData/Cache/Internal/Filter.WithPredicateState.cs +++ b/src/DynamicData/Cache/Internal/Filter.Dynamic.cs @@ -2,6 +2,7 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Reactive; using System.Reactive.Linq; using DynamicData.Internal; @@ -9,7 +10,7 @@ namespace DynamicData.Cache.Internal; internal static partial class Filter { - public static class WithPredicateState + public static class Dynamic where TObject : notnull where TKey : notnull { @@ -17,16 +18,19 @@ public static IObservable> Create( IObservable> source, IObservable predicateState, Func predicate, + IObservable reapplyFilter, bool suppressEmptyChangeSets) { source.ThrowArgumentNullExceptionIfNull(nameof(source)); predicateState.ThrowArgumentNullExceptionIfNull(nameof(predicateState)); predicate.ThrowArgumentNullExceptionIfNull(nameof(predicate)); + reapplyFilter.ThrowArgumentNullExceptionIfNull(nameof(reapplyFilter)); return Observable.Create>(downstreamObserver => new Subscription( downstreamObserver: downstreamObserver, predicate: predicate, predicateState: predicateState, + reapplyFilter: reapplyFilter, source: source, suppressEmptyChangeSets: suppressEmptyChangeSets)); } @@ -39,10 +43,13 @@ private sealed class Subscription private readonly Dictionary _itemStatesByKey; private readonly Func _predicate; private readonly IDisposable? _predicateStateSubscription; + private readonly IDisposable? _reapplyFilterSubscription; private readonly IDisposable? _sourceSubscription; private readonly bool _suppressEmptyChangeSets; + private bool _hasInitialized; private bool _hasPredicateStateCompleted; + private bool _hasReapplyFilterCompleted; private bool _hasSourceCompleted; private bool _isLatestPredicateStateValid; private TState _latestPredicateState; @@ -51,6 +58,7 @@ public Subscription( IObserver> downstreamObserver, Func predicate, IObservable predicateState, + IObservable reapplyFilter, IObservable> source, bool suppressEmptyChangeSets) { @@ -65,22 +73,46 @@ public Subscription( var onError = new Action(OnError); + using var @lock = SwappableLock.CreateAndEnter(UpstreamSynchronizationGate); + _predicateStateSubscription = predicateState .SubscribeSafe( onNext: OnPredicateStateNext, onError: onError, onCompleted: OnPredicateStateCompleted); + _reapplyFilterSubscription = reapplyFilter + .SubscribeSafe( + onNext: OnReapplyFilterNext, + onError: onError, + onCompleted: OnReapplyFilterCompleted); + _sourceSubscription = source .SubscribeSafe( onNext: OnSourceNext, onError: onError, onCompleted: OnSourceCompleted); + + _hasInitialized = true; + + // We withhold completions triggered by the other upstreams until after the source stream has a chance to publish an initial changeset. + // If that happens, we need to publish the completion now. + var needToComplete = _suppressEmptyChangeSets + && _hasPredicateStateCompleted + && !_isLatestPredicateStateValid; + + if (needToComplete) + { + @lock.SwapTo(DownstreamSynchronizationGate); + + _downstreamObserver.OnCompleted(); + } } public void Dispose() { _predicateStateSubscription?.Dispose(); + _reapplyFilterSubscription?.Dispose(); _sourceSubscription?.Dispose(); } @@ -121,7 +153,9 @@ private void OnPredicateStateCompleted() // If we didn't get at least one predicateState value, we can't ever emit any (non-empty) downstream changesets, // no matter how many items come through from source, so just go ahead and complete now. - if (_hasSourceCompleted || (!_isLatestPredicateStateValid && _suppressEmptyChangeSets)) + if (_hasInitialized + && ((_hasReapplyFilterCompleted && _hasSourceCompleted) + || (!_isLatestPredicateStateValid && _suppressEmptyChangeSets))) { @lock.SwapTo(DownstreamSynchronizationGate); @@ -136,36 +170,41 @@ private void OnPredicateStateNext(TState predicateState) _latestPredicateState = predicateState; _isLatestPredicateStateValid = true; - foreach (var key in _itemStatesByKey.Keys) + ReFilter(predicateState); + + var downstreamChanges = AssembleDownstreamChanges(); + if (((downstreamChanges.Count is not 0) || !_suppressEmptyChangeSets) && _hasInitialized) { - var itemState = _itemStatesByKey[key]; + @lock.SwapTo(DownstreamSynchronizationGate); - var isIncluded = _predicate.Invoke(predicateState, itemState.Item); + _downstreamObserver.OnNext(downstreamChanges); + } + } - if (isIncluded && !itemState.IsIncluded) - { - _downstreamChangesBuffer.Add(new( - reason: ChangeReason.Add, - key: key, - current: itemState.Item)); - } - else if (!isIncluded && itemState.IsIncluded) - { - _downstreamChangesBuffer.Add(new( - reason: ChangeReason.Remove, - key: key, - current: itemState.Item)); - } + private void OnReapplyFilterCompleted() + { + using var @lock = SwappableLock.CreateAndEnter(UpstreamSynchronizationGate); - _itemStatesByKey[key] = new() - { - IsIncluded = isIncluded, - Item = itemState.Item - }; + _hasReapplyFilterCompleted = true; + + // If the other two sources have also completed, there's no chance of us ever needing to emit further changesets. + if (_hasPredicateStateCompleted && _hasSourceCompleted) + { + @lock.SwapTo(DownstreamSynchronizationGate); + + _downstreamObserver.OnCompleted(); } + } + + private void OnReapplyFilterNext(Unit value) + { + using var @lock = SwappableLock.CreateAndEnter(UpstreamSynchronizationGate); + + if (_isLatestPredicateStateValid) + ReFilter(_latestPredicateState); var downstreamChanges = AssembleDownstreamChanges(); - if ((downstreamChanges.Count is not 0) || !_suppressEmptyChangeSets) + if (((downstreamChanges.Count is not 0) || !_suppressEmptyChangeSets) && _hasInitialized) { @lock.SwapTo(DownstreamSynchronizationGate); @@ -181,7 +220,10 @@ private void OnSourceCompleted() // We can never emit any (non-empty) downstream changes in the future, if the collection is empty // and the source has reported that it'll never change, so go ahead and complete now. - if (_hasPredicateStateCompleted || ((_itemStatesByKey.Count is 0) && _suppressEmptyChangeSets)) + if ((_hasPredicateStateCompleted && _hasReapplyFilterCompleted) + || (_suppressEmptyChangeSets + && ((_itemStatesByKey.Count is 0) + || (_hasPredicateStateCompleted && !_isLatestPredicateStateValid)))) { @lock.SwapTo(DownstreamSynchronizationGate); @@ -320,6 +362,37 @@ private void OnSourceNext(IChangeSet upstreamChanges) _downstreamObserver.OnNext(downstreamChanges); } } + + private void ReFilter(TState predicateState) + { + foreach (var key in _itemStatesByKey.Keys) + { + var itemState = _itemStatesByKey[key]; + + var isIncluded = _predicate.Invoke(predicateState, itemState.Item); + + if (isIncluded && !itemState.IsIncluded) + { + _downstreamChangesBuffer.Add(new( + reason: ChangeReason.Add, + key: key, + current: itemState.Item)); + } + else if (!isIncluded && itemState.IsIncluded) + { + _downstreamChangesBuffer.Add(new( + reason: ChangeReason.Remove, + key: key, + current: itemState.Item)); + } + + _itemStatesByKey[key] = new() + { + IsIncluded = isIncluded, + Item = itemState.Item + }; + } + } } private readonly struct ItemState diff --git a/src/DynamicData/Cache/Internal/Filter.Static.cs b/src/DynamicData/Cache/Internal/Filter.Static.cs new file mode 100644 index 000000000..70c88b9e1 --- /dev/null +++ b/src/DynamicData/Cache/Internal/Filter.Static.cs @@ -0,0 +1,75 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive.Linq; + +namespace DynamicData.Cache.Internal; + +internal static partial class Filter +{ + public static class Static + where TObject : notnull + where TKey : notnull + { + public static IObservable> Create( + IObservable> source, + Func filter, + bool suppressEmptyChangeSets) + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + filter.ThrowArgumentNullExceptionIfNull(nameof(filter)); + + return Observable.Create>(downstreamObserver => + { + var downstreamItems = new ChangeAwareCache(); + + return source + .Select(upstreamChanges => + { + foreach (var change in upstreamChanges.ToConcreteType()) + { + switch (change.Reason) + { + case ChangeReason.Add: + if (filter.Invoke(change.Current)) + downstreamItems.Add(change.Current, change.Key); + break; + + // Intentionally not supporting Moved changes, too much work to try and track indexes. + + case ChangeReason.Refresh: + { + var isIncluded = filter.Invoke(change.Current); + var wasIncluded = downstreamItems.Lookup(change.Key).HasValue; + + if (isIncluded && !wasIncluded) + downstreamItems.Add(change.Current, change.Key); + else if (isIncluded && wasIncluded) + downstreamItems.Refresh(change.Key); + else if (!isIncluded && wasIncluded) + downstreamItems.Remove(change.Key); + } + break; + + case ChangeReason.Remove: + downstreamItems.Remove(change.Key); + break; + + case ChangeReason.Update: + if (filter.Invoke(change.Current)) + downstreamItems.AddOrUpdate(change.Current, change.Key); + else + downstreamItems.Remove(change.Key); + break; + } + } + + return downstreamItems.CaptureChanges(); + }) + .Where(downstreamChanges => !suppressEmptyChangeSets || (downstreamChanges.Count is not 0)) + .SubscribeSafe(downstreamObserver); + }); + } + } +} diff --git a/src/DynamicData/Cache/Internal/StaticFilter.cs b/src/DynamicData/Cache/Internal/StaticFilter.cs deleted file mode 100644 index d6fc7cbe5..000000000 --- a/src/DynamicData/Cache/Internal/StaticFilter.cs +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. -// Roland Pheasant licenses this file to you under the MIT license. -// See the LICENSE file in the project root for full license information. - -using System.Reactive.Linq; - -namespace DynamicData.Cache.Internal; - -internal sealed class StaticFilter(IObservable> source, Func filter, bool suppressEmptyChangeSets) - where TObject : notnull - where TKey : notnull -{ - public IObservable> Run() => Observable.Create>(observer => - { - ChangeAwareCache? cache = null; - - return source.Subscribe( - changes => - { - cache ??= new ChangeAwareCache(changes.Count); - - cache.FilterChanges(changes, filter); - var filtered = cache.CaptureChanges(); - - if (filtered.Count != 0 || !suppressEmptyChangeSets) - { - observer.OnNext(filtered); - } - }, - observer.OnError, - observer.OnCompleted); - }); -} diff --git a/src/DynamicData/Cache/ObservableCacheEx.cs b/src/DynamicData/Cache/ObservableCacheEx.cs index c67c2cf55..e8e355f38 100644 --- a/src/DynamicData/Cache/ObservableCacheEx.cs +++ b/src/DynamicData/Cache/ObservableCacheEx.cs @@ -1486,14 +1486,16 @@ public static IObservable>> ExpireAfter< /// The filter. /// By default empty changeset notifications are suppressed for performance reasons. Set to false to publish empty changesets. Doing so can be useful for monitoring loading status. /// An observable which emits change sets. - public static IObservable> Filter(this IObservable> source, Func filter, bool suppressEmptyChangeSets = true) - where TObject : notnull - where TKey : notnull - { - source.ThrowArgumentNullExceptionIfNull(nameof(source)); - - return new StaticFilter(source, filter, suppressEmptyChangeSets).Run(); - } + public static IObservable> Filter( + this IObservable> source, + Func filter, + bool suppressEmptyChangeSets = true) + where TObject : notnull + where TKey : notnull + => Cache.Internal.Filter.Static.Create( + source: source, + filter: filter, + suppressEmptyChangeSets: suppressEmptyChangeSets); /// /// Creates a filtered stream which can be dynamically filtered. @@ -1504,15 +1506,16 @@ public static IObservable> Filter(this /// Observable to change the underlying predicate. /// By default empty changeset notifications are suppressed for performance reasons. Set to false to publish empty changesets. Doing so can be useful for monitoring loading status. /// An observable which emits change sets. - public static IObservable> Filter(this IObservable> source, IObservable> predicateChanged, bool suppressEmptyChangeSets = true) - where TObject : notnull - where TKey : notnull - { - source.ThrowArgumentNullExceptionIfNull(nameof(source)); - predicateChanged.ThrowArgumentNullExceptionIfNull(nameof(predicateChanged)); - - return source.Filter(predicateChanged, Observable.Empty(), suppressEmptyChangeSets); - } + public static IObservable> Filter( + this IObservable> source, + IObservable> predicateChanged, + bool suppressEmptyChangeSets = true) + where TObject : notnull + where TKey : notnull + => source.Filter( + predicateChanged: predicateChanged, + reapplyFilter: Observable.Empty(), + suppressEmptyChangeSets: suppressEmptyChangeSets); /// /// Creates a filtered stream which can be dynamically filtered, based on state values passed through to a static filtering predicate. @@ -1536,10 +1539,11 @@ public static IObservable> Filter Cache.Internal.Filter.WithPredicateState.Create( + => Cache.Internal.Filter.Dynamic.Create( source: source, predicateState: predicateState, predicate: predicate, + reapplyFilter: Observable.Empty(), suppressEmptyChangeSets: suppressEmptyChangeSets); /// @@ -1571,16 +1575,20 @@ public static IObservable> Filter(this /// Observable to re-evaluate whether the filter still matches items. Use when filtering on mutable values. /// By default empty changeset notifications are suppressed for performance reasons. Set to false to publish empty changesets. Doing so can be useful for monitoring loading status. /// An observable which emits change sets. - public static IObservable> Filter(this IObservable> source, IObservable> predicateChanged, IObservable reapplyFilter, bool suppressEmptyChangeSets = true) - where TObject : notnull - where TKey : notnull - { - source.ThrowArgumentNullExceptionIfNull(nameof(source)); - predicateChanged.ThrowArgumentNullExceptionIfNull(nameof(predicateChanged)); - reapplyFilter.ThrowArgumentNullExceptionIfNull(nameof(reapplyFilter)); + public static IObservable> Filter( + this IObservable> source, + IObservable> predicateChanged, + IObservable reapplyFilter, + bool suppressEmptyChangeSets = true) + where TObject : notnull + where TKey : notnull - return new DynamicFilter(source, predicateChanged, reapplyFilter, suppressEmptyChangeSets).Run(); - } + => Cache.Internal.Filter.Dynamic>.Create( + source: source, + predicateState: predicateChanged, + predicate: static (predicate, item) => predicate.Invoke(item), + reapplyFilter: reapplyFilter, + suppressEmptyChangeSets: suppressEmptyChangeSets); /// /// Creates a filtered stream, optimized for stateless/deterministic filtering of immutable items.