Combine.Publisher.flatMap memory leak when publisher fails
Originator: | slammer0 | ||
Number: | rdar://FB9860241 | Date Originated: | Jan 27, 2022 at 11:16 AM |
Status: | Open | Resolved: | |
Product: | Something else not on this list | Product Version: | |
Classification: | Incorrect/Unexpected Behavior | Reproducible: | yes |
Please describe the issue: When flatMap operator is used and failing publisher is returned from transform block, it never cancels upstream and blocks any cancelling efforts causing memory leak. Please list the steps you took to reproduce the issue: publisher.flatMap { _ in Fail(error: MyError.error) } That's all but please find attached swift with example that can be executed What did you expect to happen? subscription to publisher gets cancelled please see example in attached file where I wrote draft of fixed flatMap by returning Result and then using tryMap outside the transform block What actually happened? subscription to publisher is still active, calling cancel on stream cancellable has no effect, there is no way to cancel the subscription Attached file: CombineFlatMapIssue.swift https://gist.github.com/piotrtobolski/b6fa065194935f2d083e96375e3ca432 import Combine enum MyError: Error { case error } let subject1 = PassthroughSubject<Int, Error>() let cancellable1 = subject1 .print("before flatMap") .flatMap { _ in Fail<Int, Error>(error: MyError.error) } .print("after flatMap") .sink { print("completion: \($0)") } receiveValue: { print("receiveValue: \($0)") } print("Sending 1") subject1.send(1) print("Sending 2") subject1.send(2) print("Cancelling") cancellable1.cancel() print("Sending 3") subject1.send(3) extension Publisher { func flatMap2<T, P>( maxPublishers: Subscribers.Demand = .unlimited, _ transform: @escaping (Self.Output) -> P ) -> AnyPublisher<T, Failure> where T == P.Output, P : Publisher, Self.Failure == P.Failure { flatMap(maxPublishers: maxPublishers) { transform($0) .map { value -> Result<T, Failure> in Result.success(value) } .catch { return Just<Result<T, Failure>>(Result.failure($0)) } } .tryMap { result in switch result { case let .success(value): return value case let .failure(error): throw error } } .mapError { error in error as! Failure } .eraseToAnyPublisher() } } print("-----------------") let subject2 = PassthroughSubject<Int, Error>() let cancellable2 = subject2 .print("before flatMap") .flatMap2 { _ in Fail<Int, Error>(error: MyError.error) } .print("after flatMap") .sink { print("completion: \($0)") } receiveValue: { print("receiveValue: \($0)") } print("Sending 1") subject2.send(1) print("Sending 2") subject2.send(2) print("Cancelling") cancellable2.cancel() print("Sending 3") subject2.send(3) /* output: after flatMap: receive subscription: (FlatMap) after flatMap: request unlimited before flatMap: receive subscription: (PassthroughSubject) before flatMap: request unlimited Sending 1 before flatMap: receive value: (1) after flatMap: receive error: (error) completion: failure(__lldb_expr_13.MyError.error) Sending 2 before flatMap: receive value: (2) Cancelling Sending 3 before flatMap: receive value: (3) ----------------- after flatMap: receive subscription: (TryMap) after flatMap: request unlimited before flatMap: receive subscription: (PassthroughSubject) before flatMap: request unlimited Sending 1 before flatMap: receive value: (1) before flatMap: receive cancel after flatMap: receive error: (error) completion: failure(__lldb_expr_13.MyError.error) Sending 2 Cancelling Sending 3 */
Comments
Please note: Reports posted here will not necessarily be seen by Apple. All problems should be submitted at bugreport.apple.com before they are posted here. Please only post information for Radars that you have filed yourself, and please do not include Apple confidential information in your posts. Thank you!