Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions .pubnub.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: python
version: 10.6.2
version: 10.6.3
schema: 1
scm: github.com/pubnub/python
sdks:
Expand All @@ -18,7 +18,7 @@ sdks:
distributions:
- distribution-type: library
distribution-repository: package
package-name: pubnub-10.6.2
package-name: pubnub-10.6.3
location: https://pypi.org/project/pubnub/
supported-platforms:
supported-operating-systems:
Expand Down Expand Up @@ -94,8 +94,8 @@ sdks:
-
distribution-type: library
distribution-repository: git release
package-name: pubnub-10.6.2
location: https://github.com/pubnub/python/releases/download/10.6.2/pubnub-10.6.2.tar.gz
package-name: pubnub-10.6.3
location: https://github.com/pubnub/python/releases/download/10.6.3/pubnub-10.6.3.tar.gz
supported-platforms:
supported-operating-systems:
Linux:
Expand Down Expand Up @@ -169,6 +169,15 @@ sdks:
license-url: https://github.com/encode/httpx/blob/master/LICENSE.md
is-required: Required
changelog:
- date: 2026-04-20
version: 10.6.3
changes:
- type: bug
text: "The retry limit was silently clamped to the policy default, configured intervals were ignored by delay calculators, and the async reconnection loop never checked the limit or advanced its counter — fix all three and add a `maximum_reconnection_interval` config option."
- type: improvement
text: "`seqn` is not required by PubNub REST API, so remove `PublishSequenceManager`, all its subclasses, `MAX_SEQUENCE`, and `seqn` injection."
- type: improvement
text: "Cover `LinearDelay`, `ExponentialDelay`, `ReconnectionManager`, `ReconnectEffect`, and `HeartbeatDelayedEffect` with deterministic assertions for default, custom, and edge cases."
- date: 2026-03-26
version: 10.6.2
changes:
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## 10.6.3
April 20 2026

#### Fixed
- The retry limit was silently clamped to the policy default, configured intervals were ignored by delay calculators, and the async reconnection loop never checked the limit or advanced its counter — fix all three and add a `maximum_reconnection_interval` config option.

#### Modified
- `seqn` is not required by PubNub REST API, so remove `PublishSequenceManager`, all its subclasses, `MAX_SEQUENCE`, and `seqn` injection.
- Cover `LinearDelay`, `ExponentialDelay`, `ReconnectionManager`, `ReconnectEffect`, and `HeartbeatDelayedEffect` with deterministic assertions for default, custom, and edge cases.

## 10.6.2
March 26 2026

Expand Down
16 changes: 8 additions & 8 deletions pubnub/event_engine/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def __init__(self, pubnub_instance, event_engine_instance,
super().__init__(pubnub_instance, event_engine_instance, invocation)
self.reconnection_policy = pubnub_instance.config.reconnect_policy
self.interval = pubnub_instance.config.reconnection_interval
self.maximum_interval = pubnub_instance.config.maximum_reconnection_interval

if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
self.max_retry_attempts = ExponentialDelay.MAX_RETRIES
Expand All @@ -177,11 +178,10 @@ def success(self, timetoken: str, region: Optional[int] = None, **kwargs):

def calculate_reconnection_delay(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
delay = ExponentialDelay.calculate(attempts)
elif self.interval is None:
delay = LinearDelay.calculate(attempts)
delay = ExponentialDelay.calculate(
attempts, minimum_delay=self.interval, maximum_delay=self.maximum_interval)
else:
delay = self.interval
delay = LinearDelay.calculate(attempts, delay=self.interval)

return delay

Expand Down Expand Up @@ -367,6 +367,7 @@ def __init__(self, pubnub_instance, event_engine_instance,
super().__init__(pubnub_instance, event_engine_instance, invocation)
self.reconnection_policy = pubnub_instance.config.reconnect_policy
self.interval = pubnub_instance.config.reconnection_interval
self.maximum_interval = pubnub_instance.config.maximum_reconnection_interval

if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
self.max_retry_attempts = ExponentialDelay.MAX_RETRIES
Expand All @@ -387,11 +388,10 @@ def _should_give_up(self, attempts):

def calculate_reconnection_delay(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
delay = ExponentialDelay.calculate(attempts)
elif self.interval is None:
delay = LinearDelay.calculate(attempts)
delay = ExponentialDelay.calculate(
attempts, minimum_delay=self.interval, maximum_delay=self.maximum_interval)
else:
delay = self.interval
delay = LinearDelay.calculate(attempts, delay=self.interval)

return delay

Expand Down
39 changes: 15 additions & 24 deletions pubnub/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,6 @@
logger = logging.getLogger("pubnub")


class PublishSequenceManager:
def __init__(self, provided_max_sequence):
self.max_sequence = provided_max_sequence
self.next_sequence = 0

@abstractmethod
def get_next_sequence(self):
if self.max_sequence == self.next_sequence:
self.next_sequence = 1
else:
self.next_sequence += 1
return self.next_sequence


class BasePathManager(object):
MAX_SUBDOMAIN = 20
DEFAULT_SUBDOMAIN = "pubsub"
Expand Down Expand Up @@ -64,12 +50,14 @@ def set_reconnection_listener(self, reconnection_callback):
def _recalculate_interval(self):
policy = self._pubnub.config.reconnect_policy
interval = self._pubnub.config.reconnection_interval
if policy == PNReconnectionPolicy.LINEAR and interval is not None:
self._timer_interval = interval
elif policy == PNReconnectionPolicy.LINEAR:
self._timer_interval = LinearDelay.calculate(self._connection_errors)
if policy == PNReconnectionPolicy.LINEAR:
self._timer_interval = LinearDelay.calculate(self._connection_errors, delay=interval)
else:
self._timer_interval = ExponentialDelay.calculate(self._connection_errors)
self._timer_interval = ExponentialDelay.calculate(
self._connection_errors,
minimum_delay=interval,
maximum_delay=self._pubnub.config.maximum_reconnection_interval
)

def _retry_limit_reached(self):
user_limit = self._pubnub.config.maximum_reconnection_retries
Expand All @@ -83,7 +71,7 @@ def _retry_limit_reached(self):
policy_limit = (LinearDelay.MAX_RETRIES if policy == PNReconnectionPolicy.LINEAR
else ExponentialDelay.MAX_RETRIES)
if user_limit is not None:
return self._connection_errors >= min(user_limit, policy_limit)
return self._connection_errors >= user_limit
return self._connection_errors > policy_limit

@abstractmethod
Expand All @@ -101,8 +89,9 @@ class LinearDelay:
MAX_RETRIES = 10

@classmethod
def calculate(cls, attempt: int):
return cls.INTERVAL + round(random.random(), 3)
def calculate(cls, attempt: int, delay=None):
base = delay if delay is not None else cls.INTERVAL
return base + round(random.random(), 3)


class ExponentialDelay:
Expand All @@ -112,8 +101,10 @@ class ExponentialDelay:
MAX_BACKOFF = 150

@classmethod
def calculate(cls, attempt: int) -> int:
return min(cls.MAX_BACKOFF, cls.MIN_DELAY * (2 ** attempt)) + round(random.random(), 3)
def calculate(cls, attempt: int, minimum_delay=None, maximum_delay=None) -> float:
min_delay = minimum_delay if minimum_delay is not None else cls.MIN_DELAY
max_backoff = maximum_delay if maximum_delay is not None else cls.MAX_BACKOFF
return min(max_backoff, min_delay * (2 ** attempt)) + round(random.random(), 3)


class StateManager:
Expand Down
3 changes: 2 additions & 1 deletion pubnub/pnconfiguration.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def __init__(self,
self.heartbeat_notification_options = PNHeartbeatNotificationOptions.FAILURES
self.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL
self.maximum_reconnection_retries = None # -1 means unlimited/ 0 means no retries
self.reconnection_interval = None # if None is left the default value from LinearDelay is used
self.reconnection_interval = None # if None is left the default value from LinearDelay/ExponentialDelay is used
self.maximum_reconnection_interval = None # if None the default value from ExponentialDelay is used
self.daemon = False
self.use_random_initialization_vector = True
self.suppress_leave_events = False
Expand Down
29 changes: 3 additions & 26 deletions pubnub/pubnub.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
- PubNub: Main class for interacting with PubNub services
- NativeSubscriptionManager: Handles channel subscriptions and message processing
- NativeReconnectionManager: Manages network reconnection strategies
- NativePublishSequenceManager: Manages message sequence numbers for publishing
- SubscribeListener: Helper class for handling subscription events
- NonSubscribeListener: Helper class for handling non-subscription operations

Expand Down Expand Up @@ -40,7 +39,7 @@
- The SDK uses multiple threads for different operations
- SubscribeMessageWorker runs in a daemon thread
- Heartbeat and reconnection timers run in separate threads
- Thread-safe implementations for sequence management and message queuing
- Thread-safe implementations for message queuing

Error Handling:
- Automatic retry mechanisms for failed operations
Expand Down Expand Up @@ -70,7 +69,7 @@
from pubnub.endpoints.presence.leave import Leave
from pubnub.endpoints.pubsub.subscribe import Subscribe
from pubnub.enums import PNStatusCategory, PNHeartbeatNotificationOptions, PNOperationType, PNReconnectionPolicy
from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager
from pubnub.managers import SubscriptionManager, ReconnectionManager
from pubnub.models.consumer.common import PNStatus
from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub_core import PubNubCore
Expand Down Expand Up @@ -124,8 +123,6 @@ def __init__(self, config: PNConfiguration, *, custom_request_handler: Type[Base
if self.config.enable_subscribe:
self._subscription_manager = NativeSubscriptionManager(self)

self._publish_sequence_manager = PublishSequenceManager(PubNubCore.MAX_SEQUENCE)

def sdk_platform(self) -> str:
"""Get the SDK platform identifier.

Expand Down Expand Up @@ -205,12 +202,7 @@ def request_async(self, endpoint_name, endpoint_call_options, callback, cancella
)

def merge_in_params(self, options):
params_to_merge_in = {}

if options.operation_type == PNOperationType.PNPublishOperation:
params_to_merge_in['seqn'] = self._publish_sequence_manager.get_next_sequence()

options.merge_params_in(params_to_merge_in)
options.merge_params_in({})

def stop(self):
"""Stop all subscriptions and clean up resources.
Expand Down Expand Up @@ -305,21 +297,6 @@ def stop_heartbeat_timer(self):
self._timer.cancel()


class NativePublishSequenceManager(PublishSequenceManager):
def __init__(self, provided_max_sequence):
super(NativePublishSequenceManager, self).__init__(provided_max_sequence)
self._lock = threading.Lock()

def get_next_sequence(self):
with self._lock:
if self.max_sequence == self.next_sequence:
self.next_sequence = 1
else:
self.next_sequence += 1

return self.next_sequence


class NativeSubscriptionManager(SubscriptionManager):
"""Manages channel subscriptions and message processing.

Expand Down
34 changes: 10 additions & 24 deletions pubnub/pubnub_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
- AsyncioSubscriptionManager: Async implementation of subscription handling
- EventEngineSubscriptionManager: Event-driven subscription management
- AsyncioReconnectionManager: Async network reconnection handling
- AsyncioPublishSequenceManager: Async message sequence management

Features:
- Asynchronous publish/subscribe messaging
- Non-blocking network operations
Expand Down Expand Up @@ -72,7 +70,7 @@ async def main():
from pubnub.request_handlers.base import BaseRequestHandler
from pubnub.request_handlers.async_httpx import AsyncHttpxRequestHandler, WallClockTimeoutError
from pubnub.workers import SubscribeMessageWorker
from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager
from pubnub.managers import SubscriptionManager, ReconnectionManager
from pubnub import utils
from pubnub.enums import PNStatusCategory, PNHeartbeatNotificationOptions, PNOperationType, PNReconnectionPolicy
from pubnub.callbacks import SubscribeCallback, ReconnectionCallback
Expand Down Expand Up @@ -153,8 +151,6 @@ def __init__(self, config, custom_event_loop=None, subscription_manager=None, *,
if self.config.enable_subscribe:
self._subscription_manager = subscription_manager(self)

self._publish_sequence_manager = AsyncioPublishSequenceManager(self.event_loop, PubNubCore.MAX_SEQUENCE)

@property
def _connector(self):
return self._request_handler._connector
Expand Down Expand Up @@ -317,6 +313,13 @@ async def _register_heartbeat_timer(self):
reconnection attempt based on the current state.
"""
while True:
if self._retry_limit_reached():
logger.warning("Reconnection retry limit reached. Disconnecting.")
disconnect_status = PNStatus()
disconnect_status.category = PNStatusCategory.PNDisconnectedCategory
self._pubnub._subscription_manager._listener_manager.announce_status(disconnect_status)
break

self._recalculate_interval()
await asyncio.sleep(self._timer_interval)
logger.debug("reconnect loop at: %s" % utils.datetime_now())
Expand All @@ -327,9 +330,8 @@ async def _register_heartbeat_timer(self):
self._callback.on_reconnect()
break
except Exception:
if self._pubnub.config.reconnect_policy == PNReconnectionPolicy.EXPONENTIAL:
logger.debug("reconnect interval increment at: %s" % utils.datetime_now())
self._connection_errors += 1
logger.debug("reconnect interval increment at: %s" % utils.datetime_now())
self._connection_errors += 1

def start_polling(self):
"""Start the reconnection polling process."""
Expand All @@ -345,22 +347,6 @@ def stop_polling(self):
self._task.cancel()


class AsyncioPublishSequenceManager(PublishSequenceManager):
def __init__(self, ioloop, provided_max_sequence):
super(AsyncioPublishSequenceManager, self).__init__(provided_max_sequence)
self._lock = asyncio.Lock()
self._event_loop = ioloop

async def get_next_sequence(self):
async with self._lock:
if self.max_sequence == self.next_sequence:
self.next_sequence = 1
else:
self.next_sequence += 1

return self.next_sequence


class AsyncioSubscriptionManager(SubscriptionManager):
"""Manages channel subscriptions and message processing.

Expand Down
2 changes: 0 additions & 2 deletions pubnub/pubnub_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ class PubNubCore:
SDK_NAME: str = "PubNub-Python"

TIMESTAMP_DIVIDER: int = 1000
MAX_SEQUENCE: int = 65535

__metaclass__ = ABCMeta
__crypto: Optional[PubNubCryptoModule] = None
Expand All @@ -190,7 +189,6 @@ def __init__(self, config: PNConfiguration) -> None:
}

self._subscription_manager = None
self._publish_sequence_manager = None
self._base_path_manager = BasePathManager(config)
self._token_manager = TokenManager()
self._subscription_registry = PNSubscriptionRegistry(self)
Expand Down
9 changes: 2 additions & 7 deletions pubnub/request_handlers/async_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from asyncio import Event
from pubnub import utils
from pubnub.enums import PNOperationType, PNStatusCategory
from pubnub.enums import PNStatusCategory
from pubnub.errors import PNERR_CLIENT_ERROR, PNERR_JSON_DECODING_FAILED, PNERR_SERVER_ERROR
from pubnub.exceptions import PubNubException
from pubnub.models.envelopes import AsyncioEnvelope
Expand Down Expand Up @@ -73,12 +73,7 @@ async def async_request(self, options_func, cancellation_event):
create_status = options.create_status
create_exception = options.create_exception

params_to_merge_in = {}

if options.operation_type == PNOperationType.PNPublishOperation:
params_to_merge_in['seqn'] = await self.pubnub._publish_sequence_manager.get_next_sequence()

options.merge_params_in(params_to_merge_in)
options.merge_params_in({})

if options.use_base_path:
url = utils.build_url(self.pubnub.config.scheme(), self.pubnub.base_origin, options.path,
Expand Down
9 changes: 2 additions & 7 deletions pubnub/request_handlers/async_httpx.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import urllib

from pubnub import utils
from pubnub.enums import PNOperationType, PNStatusCategory
from pubnub.enums import PNStatusCategory
from pubnub.errors import PNERR_CLIENT_ERROR, PNERR_JSON_DECODING_FAILED, PNERR_SERVER_ERROR
from pubnub.exceptions import PubNubException
from pubnub.models.envelopes import AsyncioEnvelope
Expand Down Expand Up @@ -115,12 +115,7 @@ async def async_request(self, options_func, cancellation_event):
create_status = options.create_status
create_exception = options.create_exception

params_to_merge_in = {}

if options.operation_type == PNOperationType.PNPublishOperation:
params_to_merge_in['seqn'] = await self.pubnub._publish_sequence_manager.get_next_sequence()

options.merge_params_in(params_to_merge_in)
options.merge_params_in({})

if options.use_base_path:
url = utils.build_url(self.pubnub.config.scheme(), self.pubnub.base_origin, options.path,
Expand Down
Loading
Loading