123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- 'use strict';
- // https://github.com/zenparsing/es-observable
- var $export = require('./_export');
- var global = require('./_global');
- var core = require('./_core');
- var microtask = require('./_microtask')();
- var OBSERVABLE = require('./_wks')('observable');
- var aFunction = require('./_a-function');
- var anObject = require('./_an-object');
- var anInstance = require('./_an-instance');
- var redefineAll = require('./_redefine-all');
- var hide = require('./_hide');
- var forOf = require('./_for-of');
- var RETURN = forOf.RETURN;
- var getMethod = function (fn) {
- return fn == null ? undefined : aFunction(fn);
- };
- var cleanupSubscription = function (subscription) {
- var cleanup = subscription._c;
- if (cleanup) {
- subscription._c = undefined;
- cleanup();
- }
- };
- var subscriptionClosed = function (subscription) {
- return subscription._o === undefined;
- };
- var closeSubscription = function (subscription) {
- if (!subscriptionClosed(subscription)) {
- subscription._o = undefined;
- cleanupSubscription(subscription);
- }
- };
- var Subscription = function (observer, subscriber) {
- anObject(observer);
- this._c = undefined;
- this._o = observer;
- observer = new SubscriptionObserver(this);
- try {
- var cleanup = subscriber(observer);
- var subscription = cleanup;
- if (cleanup != null) {
- if (typeof cleanup.unsubscribe === 'function') cleanup = function () { subscription.unsubscribe(); };
- else aFunction(cleanup);
- this._c = cleanup;
- }
- } catch (e) {
- observer.error(e);
- return;
- } if (subscriptionClosed(this)) cleanupSubscription(this);
- };
- Subscription.prototype = redefineAll({}, {
- unsubscribe: function unsubscribe() { closeSubscription(this); }
- });
- var SubscriptionObserver = function (subscription) {
- this._s = subscription;
- };
- SubscriptionObserver.prototype = redefineAll({}, {
- next: function next(value) {
- var subscription = this._s;
- if (!subscriptionClosed(subscription)) {
- var observer = subscription._o;
- try {
- var m = getMethod(observer.next);
- if (m) return m.call(observer, value);
- } catch (e) {
- try {
- closeSubscription(subscription);
- } finally {
- throw e;
- }
- }
- }
- },
- error: function error(value) {
- var subscription = this._s;
- if (subscriptionClosed(subscription)) throw value;
- var observer = subscription._o;
- subscription._o = undefined;
- try {
- var m = getMethod(observer.error);
- if (!m) throw value;
- value = m.call(observer, value);
- } catch (e) {
- try {
- cleanupSubscription(subscription);
- } finally {
- throw e;
- }
- } cleanupSubscription(subscription);
- return value;
- },
- complete: function complete(value) {
- var subscription = this._s;
- if (!subscriptionClosed(subscription)) {
- var observer = subscription._o;
- subscription._o = undefined;
- try {
- var m = getMethod(observer.complete);
- value = m ? m.call(observer, value) : undefined;
- } catch (e) {
- try {
- cleanupSubscription(subscription);
- } finally {
- throw e;
- }
- } cleanupSubscription(subscription);
- return value;
- }
- }
- });
- var $Observable = function Observable(subscriber) {
- anInstance(this, $Observable, 'Observable', '_f')._f = aFunction(subscriber);
- };
- redefineAll($Observable.prototype, {
- subscribe: function subscribe(observer) {
- return new Subscription(observer, this._f);
- },
- forEach: function forEach(fn) {
- var that = this;
- return new (core.Promise || global.Promise)(function (resolve, reject) {
- aFunction(fn);
- var subscription = that.subscribe({
- next: function (value) {
- try {
- return fn(value);
- } catch (e) {
- reject(e);
- subscription.unsubscribe();
- }
- },
- error: reject,
- complete: resolve
- });
- });
- }
- });
- redefineAll($Observable, {
- from: function from(x) {
- var C = typeof this === 'function' ? this : $Observable;
- var method = getMethod(anObject(x)[OBSERVABLE]);
- if (method) {
- var observable = anObject(method.call(x));
- return observable.constructor === C ? observable : new C(function (observer) {
- return observable.subscribe(observer);
- });
- }
- return new C(function (observer) {
- var done = false;
- microtask(function () {
- if (!done) {
- try {
- if (forOf(x, false, function (it) {
- observer.next(it);
- if (done) return RETURN;
- }) === RETURN) return;
- } catch (e) {
- if (done) throw e;
- observer.error(e);
- return;
- } observer.complete();
- }
- });
- return function () { done = true; };
- });
- },
- of: function of() {
- for (var i = 0, l = arguments.length, items = new Array(l); i < l;) items[i] = arguments[i++];
- return new (typeof this === 'function' ? this : $Observable)(function (observer) {
- var done = false;
- microtask(function () {
- if (!done) {
- for (var j = 0; j < items.length; ++j) {
- observer.next(items[j]);
- if (done) return;
- } observer.complete();
- }
- });
- return function () { done = true; };
- });
- }
- });
- hide($Observable.prototype, OBSERVABLE, function () { return this; });
- $export($export.G, { Observable: $Observable });
- require('./_set-species')('Observable');
|