1919from kafka .util import Timer
2020
2121log = logging .getLogger ('kafka.coordinator' )
22+ heartbeat_log = logging .getLogger ('kafka.coordinator.heartbeat' )
2223
2324
2425class MemberState (object ):
@@ -449,11 +450,12 @@ def join_group(self, timeout_ms=None):
449450 timeout_ms = timer .timeout_ms )
450451 self .rejoining = True
451452
452- # fence off the heartbeat thread explicitly so that it cannot
453- # interfere with the join group. # Note that this must come after
454- # the call to onJoinPrepare since we must be able to continue
455- # sending heartbeats if that callback takes some time.
456- self ._disable_heartbeat_thread ()
453+ # fence off the heartbeat thread explicitly so that it cannot
454+ # interfere with the join group. # Note that this must come after
455+ # the call to onJoinPrepare since we must be able to continue
456+ # sending heartbeats if that callback takes some time.
457+ log .debug ("Disabling heartbeat thread during join-group" )
458+ self ._disable_heartbeat_thread ()
457459
458460 # ensure that there are no pending requests to the coordinator.
459461 # This is important in particular to avoid resending a pending
@@ -779,7 +781,7 @@ def _handle_group_coordinator_response(self, future, response):
779781 future .failure (error )
780782 else :
781783 error = error_type ()
782- log .error ("Group coordinator lookup for group %s failed: %s" ,
784+ log .error ("Group Coordinator lookup for group %s failed: %s" ,
783785 self .group_id , error )
784786 future .failure (error )
785787
@@ -815,11 +817,11 @@ def _start_heartbeat_thread(self):
815817 raise Errors .UnsupportedVersionError ('Heartbeat APIs require 0.9+ broker' )
816818 with self ._lock :
817819 if self ._heartbeat_thread is None :
818- log .info ('Starting new heartbeat thread' )
820+ heartbeat_log .info ('Starting new heartbeat thread' )
819821 self ._heartbeat_thread = HeartbeatThread (weakref .proxy (self ))
820822 self ._heartbeat_thread .daemon = True
821823 self ._heartbeat_thread .start ()
822- log .debug ("Started heartbeat thread %s" , self ._heartbeat_thread .ident )
824+ heartbeat_log .debug ("Started heartbeat thread %s" , self ._heartbeat_thread .ident )
823825
824826 def _disable_heartbeat_thread (self ):
825827 with self ._lock :
@@ -829,7 +831,7 @@ def _disable_heartbeat_thread(self):
829831 def _close_heartbeat_thread (self , timeout_ms = None ):
830832 with self ._lock :
831833 if self ._heartbeat_thread is not None :
832- log .info ('Stopping heartbeat thread' )
834+ heartbeat_log .info ('Stopping heartbeat thread' )
833835 try :
834836 self ._heartbeat_thread .close (timeout_ms = timeout_ms )
835837 except ReferenceError :
@@ -893,7 +895,7 @@ def _send_heartbeat_request(self):
893895 request = HeartbeatRequest [version ](self .group_id ,
894896 self ._generation .generation_id ,
895897 self ._generation .member_id )
896- log .debug ("Heartbeat: %s[%s] %s" , request .group , request .generation_id , request .member_id ) # pylint: disable-msg=no-member
898+ heartbeat_log .debug ("Heartbeat: %s[%s] %s" , request .group , request .generation_id , request .member_id ) # pylint: disable-msg=no-member
897899 future = Future ()
898900 _f = self ._client .send (self .coordinator_id , request )
899901 _f .add_callback (self ._handle_heartbeat_response , future , time .time ())
@@ -906,38 +908,38 @@ def _handle_heartbeat_response(self, future, send_time, response):
906908 self ._sensors .heartbeat_latency .record ((time .time () - send_time ) * 1000 )
907909 error_type = Errors .for_code (response .error_code )
908910 if error_type is Errors .NoError :
909- log .debug ("Received successful heartbeat response for group %s" ,
911+ heartbeat_log .debug ("Received successful heartbeat response for group %s" ,
910912 self .group_id )
911913 future .success (None )
912914 elif error_type in (Errors .CoordinatorNotAvailableError ,
913915 Errors .NotCoordinatorError ):
914- log .warning ("Heartbeat failed for group %s: coordinator (node %s)"
916+ heartbeat_log .warning ("Heartbeat failed for group %s: coordinator (node %s)"
915917 " is either not started or not valid" , self .group_id ,
916918 self .coordinator ())
917919 self .coordinator_dead (error_type ())
918920 future .failure (error_type ())
919921 elif error_type is Errors .RebalanceInProgressError :
920- log .warning ("Heartbeat failed for group %s because it is"
922+ heartbeat_log .warning ("Heartbeat failed for group %s because it is"
921923 " rebalancing" , self .group_id )
922924 self .request_rejoin ()
923925 future .failure (error_type ())
924926 elif error_type is Errors .IllegalGenerationError :
925- log .warning ("Heartbeat failed for group %s: generation id is not "
927+ heartbeat_log .warning ("Heartbeat failed for group %s: generation id is not "
926928 " current." , self .group_id )
927929 self .reset_generation ()
928930 future .failure (error_type ())
929931 elif error_type is Errors .UnknownMemberIdError :
930- log .warning ("Heartbeat: local member_id was not recognized;"
932+ heartbeat_log .warning ("Heartbeat: local member_id was not recognized;"
931933 " this consumer needs to re-join" )
932934 self .reset_generation ()
933935 future .failure (error_type )
934936 elif error_type is Errors .GroupAuthorizationFailedError :
935937 error = error_type (self .group_id )
936- log .error ("Heartbeat failed: authorization error: %s" , error )
938+ heartbeat_log .error ("Heartbeat failed: authorization error: %s" , error )
937939 future .failure (error )
938940 else :
939941 error = error_type ()
940- log .error ("Heartbeat failed: Unhandled error: %s" , error )
942+ heartbeat_log .error ("Heartbeat failed: Unhandled error: %s" , error )
941943 future .failure (error )
942944
943945
@@ -1003,14 +1005,14 @@ def __init__(self, coordinator):
10031005
10041006 def enable (self ):
10051007 with self .coordinator ._lock :
1006- log .debug ('Enabling heartbeat thread' )
1008+ heartbeat_log .debug ('Enabling heartbeat thread' )
10071009 self .enabled = True
10081010 self .coordinator .heartbeat .reset_timeouts ()
10091011 self .coordinator ._lock .notify ()
10101012
10111013 def disable (self ):
10121014 with self .coordinator ._lock :
1013- log .debug ('Disabling heartbeat thread' )
1015+ heartbeat_log .debug ('Disabling heartbeat thread' )
10141016 self .enabled = False
10151017
10161018 def close (self , timeout_ms = None ):
@@ -1032,24 +1034,24 @@ def close(self, timeout_ms=None):
10321034 timeout_ms = self .coordinator .config ['heartbeat_interval_ms' ]
10331035 self .join (timeout_ms / 1000 )
10341036 if self .is_alive ():
1035- log .warning ("Heartbeat thread did not fully terminate during close" )
1037+ heartbeat_log .warning ("Heartbeat thread did not fully terminate during close" )
10361038
10371039 def run (self ):
10381040 try :
1039- log .debug ('Heartbeat thread started' )
1041+ heartbeat_log .debug ('Heartbeat thread started' )
10401042 while not self .closed :
10411043 self ._run_once ()
10421044
10431045 except ReferenceError :
1044- log .debug ('Heartbeat thread closed due to coordinator gc' )
1046+ heartbeat_log .debug ('Heartbeat thread closed due to coordinator gc' )
10451047
10461048 except RuntimeError as e :
1047- log .error ("Heartbeat thread for group %s failed due to unexpected error: %s" ,
1049+ heartbeat_log .error ("Heartbeat thread for group %s failed due to unexpected error: %s" ,
10481050 self .coordinator .group_id , e )
10491051 self .failed = e
10501052
10511053 finally :
1052- log .debug ('Heartbeat thread closed' )
1054+ heartbeat_log .debug ('Heartbeat thread closed' )
10531055
10541056 def _run_once (self ):
10551057 with self .coordinator ._client ._lock , self .coordinator ._lock :
@@ -1063,16 +1065,16 @@ def _run_once(self):
10631065
10641066 with self .coordinator ._lock :
10651067 if not self .enabled :
1066- log .debug ('Heartbeat disabled. Waiting' )
1068+ heartbeat_log .debug ('Heartbeat disabled. Waiting' )
10671069 self .coordinator ._lock .wait ()
1068- log .debug ('Heartbeat re-enabled.' )
1070+ heartbeat_log .debug ('Heartbeat re-enabled.' )
10691071 return
10701072
10711073 if self .coordinator .state is not MemberState .STABLE :
10721074 # the group is not stable (perhaps because we left the
10731075 # group or because the coordinator kicked us out), so
10741076 # disable heartbeats and wait for the main thread to rejoin.
1075- log .debug ('Group state is not stable, disabling heartbeats' )
1077+ heartbeat_log .debug ('Group state is not stable, disabling heartbeats' )
10761078 self .disable ()
10771079 return
10781080
@@ -1088,14 +1090,14 @@ def _run_once(self):
10881090 # the session timeout has expired without seeing a
10891091 # successful heartbeat, so we should probably make sure
10901092 # the coordinator is still healthy.
1091- log .warning ('Heartbeat session expired, marking coordinator dead' )
1093+ heartbeat_log .warning ('Heartbeat session expired, marking coordinator dead' )
10921094 self .coordinator .coordinator_dead ('Heartbeat session expired' )
10931095
10941096 elif self .coordinator .heartbeat .poll_timeout_expired ():
10951097 # the poll timeout has expired, which means that the
10961098 # foreground thread has stalled in between calls to
10971099 # poll(), so we explicitly leave the group.
1098- log .warning ('Heartbeat poll expired, leaving group' )
1100+ heartbeat_log .warning ('Heartbeat poll expired, leaving group' )
10991101 ### XXX
11001102 # maybe_leave_group acquires client + coordinator lock;
11011103 # if we hold coordinator lock before calling, we risk deadlock
@@ -1106,7 +1108,7 @@ def _run_once(self):
11061108 elif not self .coordinator .heartbeat .should_heartbeat ():
11071109 # poll again after waiting for the retry backoff in case
11081110 # the heartbeat failed or the coordinator disconnected
1109- log .log (0 , 'Not ready to heartbeat, waiting' )
1111+ heartbeat_log .log (0 , 'Not ready to heartbeat, waiting' )
11101112 self .coordinator ._lock .wait (self .coordinator .config ['retry_backoff_ms' ] / 1000 )
11111113
11121114 else :
0 commit comments