
    9j.             	       ~   U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZ d dlmZ d d	l m!Z!m"Z" d d
l#m$Z$ d dl%Z%d dl&Z%d dl'Z%d dl(m)Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4 d dl5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z=m>Z>m?Z?m@Z@mAZAmBZB d dlCmDZDmEZEmFZF  ej                  eH      ZIeIj                  ej                         g dZLddgZMe=xs e>xs eAZN G d de"      ZOi d eOdd      d eOdd      d eOdd      d eOd d!      d" eOd#d$      d% eOd&d'      d( eOd)d*      d+ eOd,d-      d. eOd/d0      d1 eOd2d3      d4 eOd5d6      d7 eOd8d9      d: eOd;d<      d= eOd>d?      d@ eOdAdB      dC eOdDdE      dF eOdGdH      dI eOdJdK      iZPe G dL dM             ZQdN ZRdO ZSdP ZTdQ ZUdR ZVdS ZWdT ZXdUeYfdVZZdWdXdYZ[dZe\fd[Z]d\e!d]e\dUe\fd^Z^d_ Z_d` Z`da Zadb Zbdc Zcdd Zdde Zedf Zfdg Zgdh ZhddiZidj Zjdk Zk e9dl       ZleYemdm<   dn Zndoeoepdpf   fdqZqddrZrds Zsdte%j                  due\dve\dUeYfdwZue:dxdydz ed{|      dzdWdzfd}       Zve@rd~Zwn e\ ej                  dd            Zwdd~dZye?rdeyd<   ddeYfdZzdUe\fdZ{ed        Z|dde\de\de\fdZ}de\depfdZ~daej                   dz  emd<   ddepdz  dUdfdZddZ	 dde%j
                  j                  de%j
                  j                  de\dz  dUe!fdZe?r+ ed ede%j                  j                                     ZndZ G d deB      Z G d de      Zdeepee!   f   dede!fdZej                   dUeYfd       Zd ZdewefdZ G d deB      Z G d de,j*                        Z G d de,j*                        Ze	 dd       Z G d de%j2                  j4                  j                        Z G d de      Z G d deB      Zy)    N)Callable)contextmanager)	dataclass)	timedelta)Enum)partialreducewraps)StringIO)Any
NamedTuple)patch)
DeviceType)_SymmetricMemory)	trace_log)common_utils)FILE_SCHEMAfind_free_portIS_SANDCASTLELazyValretry_on_connect_failuresskip_but_pass_in_sandcastleskip_but_pass_in_sandcastle_if	TEST_CUDATEST_HPUTEST_WITH_ROCMTEST_WITH_TSANTEST_XPUTestCase)_install_threaded_pg_uninstall_threaded_pgProcessLocalGroupncclxcclhcclcudaxpuc                   "    e Zd ZU eed<   eed<   y)TestSkip	exit_codemessageN)__name__
__module____qualname__int__annotations__str     j/media/conek/DATA/Code/OCR/venv/lib/python3.12/site-packages/torch/testing/_internal/common_distributed.pyr*   r*   E   s    NLr4   r*   backend_unavailableH   z5Skipped because distributed backend is not available.small_worldsizeI   z Skipped due to small world size.odd_worldsizeW   zSkipped due to odd world size.no_cudaJ   zCUDA is not available.zmulti-gpu-1K   zNeed at least 1 CUDA devicezmulti-gpu-2M   zNeed at least 2 CUDA deviceszmulti-gpu-3P   zNeed at least 3 CUDA deviceszmulti-gpu-4Q   zNeed at least 4 CUDA deviceszmulti-gpu-5R   zNeed at least 5 CUDA deviceszmulti-gpu-6S   zNeed at least 6 CUDA deviceszmulti-gpu-7T   zNeed at least 7 CUDA deviceszmulti-gpu-8U   zNeed at least 8 CUDA devicesr$   L   z#c10d not compiled with NCCL support
skipIfRocmN   zTest skipped for ROCmno_peer_accessO   z'Test skipped because no GPU peer accessgenericV   zHTest skipped at subprocess level, look at subprocess log for skip reasonimporterrorX   z"Test skipped due to missing importno_acceleratorY   zaccelerator is not available.c                       e Zd Zi Zh ded<    e       ed<   h ded<   h ded<   i Zh ded<   h ded	<   h ded
<   h ded<    e       ed<   erdhed<   erdhed<   yy)DistTestCases>   mpiuccr$   r%   allgather_coalescedr	   >   rT   r$   r%   zsendrecv anysourcezcpu barrier>   rT   gloor$   gpur'   ddpsubgrouppluginr&   hpur%   r(   N)r-   r.   r/   skip_collectivesetbackend_featurer   r   r3   r4   r5   rR   rR   d   s     O-KO)* #OH,CO()%<OM" O4OE5OF4OE"9OJ #OH"("( r4   rR   c                     | t         v S N)DDP_RANK_DEVICES)devices    r5   requires_ddp_rankrc   z   s    %%%r4   c                 .     t                fd       }|S )zSkips if the world size exceeds the number of GPUs, ensuring that if the
    test is run, each rank has its own GPU via ``torch.cuda.device(rank)``.c                     t         s2t        s,t        s&t        j                  t
        d   j                         t        t        j                  d         }t         rJt        j                  j                         |k  r)t        j                  t
        d|    j                         t        rJt        j                  j                         |k  r)t        j                  t
        d|    j                         t        rJt        j                  j                         |k  r)t        j                  t
        d|    j                          | i |S )Nr<   
WORLD_SIZE
multi-gpu-)r   r   r   sysexit
TEST_SKIPSr+   r0   osenvirontorchr'   device_countr[   r(   )argskwargs
world_sizefuncs      r5   wrapperzskip_if_no_gpu.<locals>.wrapper   s    XHHZ	*445L12
002Z?HHZ*ZL 9:DDE		..0:=HHZ*ZL 9:DDE		..0:=HHZ*ZL 9:DDET$V$$r4   r
   rr   rs   s   ` r5   skip_if_no_gpurv   ~   s"     4[% % Nr4   c                 .     t                fd       }|S )Nc                      t         j                  d   dk7  rEt        t         j                  d         dk  r&t        j                  t
        d   j                          | i |S )NBACKENDrS   rf      r8   rk   rl   r0   rh   ri   rj   r+   ro   rp   rr   s     r5   rs   z(skip_if_small_worldsize.<locals>.wrapper   sR    JJy!U*BJJ|4L0MPQ0QHHZ 12<<=T$V$$r4   rt   ru   s   ` r5   skip_if_small_worldsizer}           
4[% % Nr4   c                 .     t                fd       }|S )Nc                      t         j                  d   dk7  rHt        t         j                  d         dz  dk(  r&t        j                  t
        d   j                          | i |S )Nry   rS   rf         r:   r{   r|   s     r5   rs   z&skip_if_odd_worldsize.<locals>.wrapper   sW    JJy!U*BJJ|4L0MPQ0QUV0VHHZ0::;T$V$$r4   rt   ru   s   ` r5   skip_if_odd_worldsizer      r~   r4   c                       fd}|S )Nc                 4     t                fd       }|S )Nc                      dk(  rKt         j                  j                         k  r*t        j                  t
        d    j                         y  | i |S Nr$   rg   )rm   r'   rn   rh   ri   rj   r+   )ro   rp   backendrr   ns     r5   rs   zCrequire_n_gpus_for_nccl_backend.<locals>.decorator.<locals>.wrapper   sM    & UZZ%<%<%>%Bj$45??@T,V,,r4   rt   )rr   rs   r   r   s   ` r5   	decoratorz2require_n_gpus_for_nccl_backend.<locals>.decorator   s     	t	- 
	- r4   r3   )r   r   r   s   `` r5   require_n_gpus_for_nccl_backendr      s     r4   c                      d } | S )Nc                 .     t                fd       }|S )Nc                      	 ddl m}m}  | i |S # t        $ r) t	        j
                  t        d   j                         Y y w xY w)Nr   )AutoModelForMaskedLM
BertConfigrM   )transformersr   r   ImportErrorrh   ri   rj   r+   )ro   rp   r   r   rr   s       r5   rs   z?import_transformers_or_skip.<locals>.decorator.<locals>.wrapper   sA    >IT,V,, >M2<<=>s    /AArt   ru   s   ` r5   r   z.import_transformers_or_skip.<locals>.decorator   s     	t	> 
	> r4   r3   )r   s    r5   import_transformers_or_skipr      s    
 r4   c                     t         r"t        j                  j                         | k\  ryt        r"t        j
                  j                         | k\  ryt        r"t        j                  j                         | k\  ryyNTF)r   rm   r'   rn   r   r[   r   r(   )xs    r5   at_least_x_gpur      sS    UZZ,,.!3EII**,1EII**,1r4   returnc                 V    t        | d   dd       }t        |       dk(  s|y ||       y)Nr   _handle_test_skipFT)getattrlen)ro   msgr   s      r5   _maybe_handle_skip_if_lt_x_gpur      s5    Q)<dC
4yA~*2cr4   F)	allow_cpuc                      fd}|S )zSkip if fewer than x accelerators available.

    Args:
        x: Minimum number of accelerators required.
        allow_cpu: If True, run the test on CPU-only machines (no accelerators).
    c                 4     t                fd       }|S )Nc                  <   t         j                  j                         r)t         j                  j                         k\  r | i |S t        r)t         j
                  j                         k\  r | i |S t        r)t         j                  j                         k\  r | i |S r2t         j                  j                         st        st        s | i |S t        d    }t        | |j                        s t        j                  |j                         y y )Nrg   )rm   r'   is_availablern   r   r[   r   r(   rj   r   r,   rh   ri   r+   )ro   rp   	test_skipr   rr   r   s      r5   rs   z4skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapper   s    zz&&(UZZ-D-D-F!-KT,V,,EII2249T,V,,EII2249T,V,,%**"9"9";x8T,V,,"Zs#34I1$	8I8IJ,,- Kr4   rt   )rr   rs   r   r   s   ` r5   r   z#skip_if_lt_x_gpu.<locals>.decorator   s     	t	. 
	. r4   r3   )r   r   r   s   `` r5   skip_if_lt_x_gpur      s    " r4   r   c                       fd}|S )aR  
    Decorator to request a specific world size for a test. The test harness can
    read this attribute to set the number of ranks to spawn. If there are fewer
    than `n` CUDA devices available, the test should be skipped by the harness.

    Usage:
        @require_world_size(3)
        def test_something(self):
            ...
    c                     | _         t        j                  j                         } t	        j
                  |k\  d d|       |       S )Nz	requires z GPUs, found )_required_world_sizerm   r'   rn   unittest
skipUnless)rr   	availabler   s     r5   r   z&requires_world_size.<locals>.decorator  sR    $%!JJ++-	
x""Nis-	{C

 	r4   r3   )r   r   s   ` r5   requires_world_sizer      s     r4   objdefaultc                     	 t        | d      r%t        | j                        r| j                         n| j                  }t	        | |      }|j
                  }t        |      S # t        $ r |cY S w xY w)z
    Returns the requested world size for the currently running unittest method on `obj`
    if annotated via `@require_world_size(n)`, else returns `default`.
    _current_test_name)hasattrcallabler   _testMethodNamer   r   r0   	Exception)r   r   	test_namefnvalues        r5   get_required_world_sizer     st    
 s01hs?U?U6V ""$$$ 	
 S)$''5z s   AA" "A0/A0c                       fd}|S )Nc                 4     t                fd       }|S )Nc                  2   dk7  r | i |S t         j                  j                         r)t         j                  j                         k\  r | i |S t        d    }t        | |j                        s t        j                  |j                         y y r   )
rm   r'   r   rn   rj   r   r,   rh   ri   r+   )ro   rp   r   r   rr   r   s      r5   rs   z9nccl_skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapper'  s    & T,V,,zz&&(UZZ-D-D-F!-KT,V,,"Zs#34I1$	8I8IJ,,- Kr4   rt   )rr   rs   r   r   s   ` r5   r   z(nccl_skip_if_lt_x_gpu.<locals>.decorator&  s     	t	. 
	. r4   r3   )r   r   r   s   `` r5   nccl_skip_if_lt_x_gpur   %  s     r4   c                    | j                         }d|vrt        d      d|vrt        d      d|vrt        d      |d   }|j                  d      dk(  r|n|j                  d      d	   }||vrt        d
| d|       y )N	iterationz(Expected 'iteration' in ddp_logging_data	has_errorz(Expected 'has_error' in ddp_logging_dataerrorz$Expected 'error' in ddp_logging_dataz
Exception raised from r   zDid not find expected z in ddp logging data error: )_get_ddp_logging_dataAssertionErrorfindsplit)	model_DDP
err_substrddp_logging_datalogging_erractuals        r5   verify_ddp_error_loggedr   6  s     668**GHH**GHH&&CDD"7+K ??56"< 	89!< 
 [ $VH,HV
 	
 !r4   c                 .     t                fd       }|S )aJ  
    Convenience decorator to set/unset TORCH_NCCL_BLOCKING_WAIT flag. Note that use of
    this decorator will override the setting of TORCH_NCCL_ASYNC_ERROR_HANDLING for
    the particular test. After the test, both TORCH_NCCL_BLOCKING_WAIT and
    TORCH_NCCL_ASYNC_ERROR_HANDLING will be restored to their original values.
    c                     	 t         j                  d   }t         j                  d= 	 t         j                  d   }dt         j                  d<   	  | i |}|||t         j                  d<   ||t         j                  d<   S S # t        $ r d }Y jw xY w# t        $ r d }Y gw xY w# dt         j                  d<   w xY w# ||t         j                  d<   ||t         j                  d<   w w xY w)NTORCH_NCCL_ASYNC_ERROR_HANDLINGTORCH_NCCL_BLOCKING_WAIT1)rk   rl   KeyError)ro   rp    cached_nccl_async_error_handlingcached_nccl_blocking_waitretrr   s        r5   rs   z(with_nccl_blocking_wait.<locals>.wrapperT  s   	4;=::1<, 

<=	946JJ*5% 69BJJ12	S''C 0;4 

<= )49R

56 51  	4/3,	4  	-(,%	- 69BJJ12 0;4 

<= )49R

56 5s@   $B B 	B> BBB# B& "B##B& &B;>-C+rt   ru   s   ` r5   with_nccl_blocking_waitr   L  s%     4[ S  SD Nr4   c                       fd}|S )zK
    Runs a test for each distributed debug level specified in levels.
    c                 2     t                fd       }|S )Nc                     t         j                  j                  dd       }D ][  }|t         j                  d<   t        j                           | i |}t        j
                          |I|t         j                  d<   ] S )NTORCH_DISTRIBUTED_DEBUG)rk   rl   getc10dset_debug_level_from_envbarrier)ro   rp   	old_levellevelr   rr   levelss        r5   rs   z:with_dist_debug_levels.<locals>.decorator.<locals>.wrapper  sx    

'@$GI F8=

45--/D+F+(<EBJJ89F Jr4   rt   )rr   rs   r   s   ` r5   r   z)with_dist_debug_levels.<locals>.decorator  s     	t	 
	 r4   r3   )r   r   s   ` r5   with_dist_debug_levelsr   z  s    
$ r4   c                  @    t        t        j                          d      S )Nz+c10d was not compiled with the Gloo backend)r   r   is_gloo_availabler3   r4   r5   requires_gloor     !    )""$$5 r4   c           	         t         sd S t        j                         st        d      S t	        t
        j                  j                  j                         | k  d|  dt
        j                  j                  j                          d|       S )Nc                     | S r`   r3   )fs    r5   <lambda>z'requires_nccl_version.<locals>.<lambda>  s     r4   +c10d was not compiled with the NCCL backendz0Requires NCCL version greater than or equal to: z	, found: z
, reason: )	r   r   is_nccl_availabler   r   rm   r'   r$   version)r   r   s     r5   requires_nccl_versionr     s    !!#*9
 	
 .JJOO##%/>wiyQVQ[Q[Q`Q`QhQhQjPkkuvyuz{
 	
r4   c                      t        dd      S )zK
    Require NCCL shrink support (NCCL available and version >= 2.27).
    )r      z Need NCCL 2.27+ for shrink_group)r   r3   r4   r5   requires_nccl_shrinkr     s     !*LMMr4   c                  @    t        t        j                          d      S )Nr   )r   r   r   r3   r4   r5   requires_ncclr     r   r4   c                  @    t        t        j                          d      S )Nz*c10d was not compiled with the UCC backend)r   r   is_ucc_availabler3   r4   r5   requires_uccr     !    )!!##4 r4   c                  @    t        t        j                          d      S )Nz*c10d was not compiled with the MPI backend)r   r   is_mpi_availabler3   r4   r5   requires_mpir     r   r4   c                 V    | t         } t        d | D              }t        | d|        S )a  
    Decorator to skip tests if no accelerator communication backend (NCCL, XCCL, HCCL) is available.

    Args:
        backends (Optional[List[str]]): Specific accelerator backends to check (e.g., ["nccl", "xccl", "hccl"]).
                                       If None, checks all supported accelerator backends (NCCL, XCCL, HCCL).

    Returns:
        callable: A decorator that skips the test if no specified accelerator backend is available.
    c              3      K   | ]=  }	 t        j                  t         j                  d  dj                  |d               ? yw)c                      t         S r`   )r   r3   r4   r5   r   z=requires_accelerator_dist_backend.<locals>.<genexpr>.<lambda>  s    H r4   r#   c                       yNFr3   r3   r4   r5   r   z=requires_accelerator_dist_backend.<locals>.<genexpr>.<lambda>  s    r4   N)r   r   is_xccl_availabler   ).0r   s     r5   	<genexpr>z4requires_accelerator_dist_backend.<locals>.<genexpr>  sG       	&****$	
 #g}
%		(s   AAz5No accelerator communication backend available among )ACCELERATOR_DIST_BACKENDSanyr   )backendsbackend_availables     r5   !requires_accelerator_dist_backendr    sH     ,     *
?zJ r4   c                      t         j                  j                         xr$ t        j                  t
        j                  d      } t        |  d      S )Nr   z"multicast support is not available)rm   r'   r   r   has_multicast_supportr   CUDAr   )r  s    r5   requires_multicast_supportr
    sI    

! 	G22:??AF  *!!, r4   c                      t         r@t        r9ddg} | D ]/  }|t        j                  j	                  d      j
                  v s/ y yyy)Ngfx942gfx950r   TF)r   r   rm   r'   get_device_propertiesgcnArchName)	arch_listarchs     r5   #evaluate_platform_supports_symm_memr    sL    !8,I!  5::;;A>JJJ  r4   c                      t               S r`   )r  r3   r4   r5   r   r     s
    /1 r4   PLATFORM_SUPPORTS_SYMM_MEMc                 d     t        j                  t        t        d   j                        |       S )z&Skips a test for ROCm multiprocess UTsrG   )r   skipIfr   rj   r,   )rr   s    r5   skip_if_rocm_multiprocessr     s%    L8??>:l+C+K+KLTRRr4   r  .c                       fd}|S )z4Skips a test for given ROCm archs - multiprocess UTsc                     d }t         rDt        j                  j                  d      j                  j                  d      d   }|v rd } t        j                  |d u|      |       S )Nr   :z0skip_if_rocm_arch_multiprocess: test skipped on )r   rm   r'   r  r  r   r   r  )rr   reasonpropr  s      r5   r   z1skip_if_rocm_arch_multiprocess.<locals>.decorator  se    ::33A6BBHHMaPDt|KD6R:xvT16:4@@r4   r3   )r  r   s   ` r5   skip_if_rocm_arch_multiprocessr    s    A r4   c                       fd}|S )z:Skips a test for ROCm based on ROCm ver - multiprocess UTsc                 :   d }t         rut        t        j                  j                        }|j                  dd      d   }t        d |j                  d      D              }||t              k  r	d| d d	} t        j                  |d u|      |       S )
N-r   maxsplitr   c              3   2   K   | ]  }t        |        y wr`   )r0   )r   r   s     r5   r  zLskip_if_rocm_ver_lessthan_multiprocess.<locals>.decorator.<locals>.<genexpr>  s     &O!s1v&Os   .z-skip_if_rocm_ver_lessthan_multiprocess: ROCm z is available but z	 required)	r   r2   rm   r   hipr   tupler   r  )rr   r  rocm_versionrocm_version_tupler   s       r5   r   z9skip_if_rocm_ver_lessthan_multiprocess.<locals>.decorator  s    u}}001L'--cA->qAL!&&O|7I7I#7N&O!O"*?%g6HI[H\\novnw  xA  B:xvT16:4@@r4   r3   )r   r   s   ` r5   &skip_if_rocm_ver_lessthan_multiprocessr)    s    A r4   c                  <    t        t        j                  dk(  d      S )Nwin32z8This unit test case is not supported on Windows platform)r   rh   platformr3   r4   r5   skip_if_win32r-  )  s    )B r4   rb   majorminorc                     | j                   dk7  ryt        j                  j                  yt        j                  j                  |       ||fk\  S )z
    Returns True if the device's compute capability is (major, minor) or higher.
    Error out if the device is not a CUDA device.
    Returns False if device is a RoCM device.
    Returns True if device is a non-CUDA device.
    r'   TF)typerm   r   r%  r'   get_device_capability)rb   r.  r/  s      r5   sm_is_or_higher_thanr3  0  sD     {{f}}$::++F3u~EEr4   	localhostr   T   )minutesc                     t               }|rEt        |t        d      z        }t        j                  j
                  j                  | ||||      S t        j                  | |||||      S )zL
    Creates a TCP store. Retries if the chosen port is already in use.
    r   )milliseconds)wait_for_workers	use_libuv)r   r0   r   rm   classes	dist_c10dTCPStorer   )	addrrq   	is_mastertimeoutr9  	jit_classr:  porttimeout_milliseconds	            r5   create_tcp_storerD  A  sr     D!'I1,E"EF}}&&//$
I/B
 	
 }}-
 	
r4   i  !DISTRIBUTED_TESTS_DEFAULT_TIMEOUT300i  )test_ddp_uneven_inputstest_DistributedDataParallel   test_join_kwargs	lazy_initc                     t         j                  dk(  s| !t        j                  j	                  d|      S t        j                  j	                  | |      S )Nr+  z	127.0.0.1)hostnamerK  	interfacerK  )rh   r,  r   ProcessGroupGloocreate_devicerN  s     r5   rQ  rQ  o  s[    
||w)"3$$22 I 3 
 	
 $$229 3 
 	
r4   c                 Z    t         j                  | j                  d      d   t              S Nr$  r   )TIMEOUT_OVERRIDEr   r   TIMEOUT_DEFAULT)test_ids    r5   get_timeoutrW  z  s#    c 22 6HHr4   c               #   N  K   t               t               }} t        j                  t        j                  }}	 | |ct        _        t        _        t        j                  t        j                  f ||ct        _        t        _        y # ||ct        _        t        _        w xY wwr`   )r   rh   stdoutstderr)new_outnew_errold_outold_errs       r5   captured_outputr_  ~  sl     z8:WGzz3::WG2!('
CJjj#**$$!('
CJ'
CJs   5B%9B	 1B%	B""B%rankrq   
num_inputsc                    ddt         dt         dt         dt         fd}dt         fd}t        |d      t        |d	      t        |d
      t        |d      t        |d	      t        |d
      fD cg c]N  }t        |      D cg c]  } ||| z  |z   ||z         c}t        |      D cg c]  } ||||z         c}fP c}}S c c}w c c}w c c}}w )z
    Generate a number of basic test cases for sparse reduction.
    These cover tensors with a varying number of sparse dimensions and a varying
    number of dense dimensions. The only reduction operation we support is sum.
    r   r`  rq   sparse_dims
dense_dimsc           	         t        j                  t        j                  | dz         d| dz   f      }|gt        |      D cg c]  }d c}z   }t        |dz
        D ]A  }t        j                  |t        j
                  d| dz         f      }|j                  |       C t        j                  | dz   gt        |      D cg c]  }d c}z         }t        j                  |||      S c c}w c c}w )Nr   r   )	rm   reshapearangerangecatzerosappendonessparse_coo_tensor)r`  rq   rc  rd  indices_shapevaluess           r5   generatez,simple_sparse_reduce_tests.<locals>.generate  s     --TAX 6D1HF5+<=a=={Q' 	%Aii%++a*B CDGLL$	% TAXJU:5F)G!)GGH&&w>>  > *Hs   	C+	C0
c           
      |    t        t        j                  t        |      D cg c]  } | ||       c}      S c c}w r`   )r	   operatoraddrh  )r   rq   r`  s      r5   compute_sumz/simple_sparse_reduce_tests.<locals>.compute_sum  s2    LLE*<MND2dJ/N
 	
Ns   9
)rc  r      )rd  )r   r   )r0   r   rh  )r`  rq   ra  rr  rv  r   is          r5   simple_sparse_reduce_testsry    s    
?s 
? 
?# 
?s 
?
C 
 H!,H!,H!,H+H+H+
 	 z* :$q(*z*AB @EZ?PQ![Z*45Q	
  Rs$   5CC C/CC
Cr   c           
      f   t         j                  j                         }t        rt         j                  j                         }t
        rt         j                  j                         }t        |      }d}| |kD  r|| z  }t        |       D ci c]  }|t        |||z  |dz   |z          }}|S c c}w )zMultigpu tests are designed to simulate the multi nodes with multi
    GPUs on each node. Nccl backend requires equal #GPUs in each process.
    On a single node, all visible GPUs are evenly
    divided to subsets, each process only uses a subset.
    r   )	rm   r'   rn   r   r[   r   r(   rh  list)rq   r   nGPUsvisible_devicesnGPUs_per_processrx  rank_to_GPUs          r5   init_multigpu_helperr    s     JJ##%E		&&(		&&(ElO E!Z/ z" 	
4$5 5QBS8STUUK  	s   B.tmp_dirinit_methodc                    t        j                         at        j                  t        j
                  d<   t	        j                  t        j                  j                  t        j                  d             t	        j                  t        j                  j                  t        j                  d             t        j                  j                  t        j                  d      }t	        j                  |       | | t        j
                  d<   y t        t        j                  j                  |d      z   t        j
                  d<   y )NTEMP_DIRr   test_dirinit_dirINIT_METHODshared_init_file)
tempfileTemporaryDirectoryr  namerk   rl   mkdirpathjoinr   )r  init_dir_paths     r5   initialize_temp_directoriesr    s    ))+G$\\BJJzHHRWW\\',,	23HHRWW\\',,
34GGLLz:MHH]$/

=!$/"'',,-3
 %


=!r4   c                  :    t         t         j                          y y r`   )r  cleanupr3   r4   r5   cleanup_temp_dirr    s     r4   processcompletion_queuer@  c                    |dnt        dt        d|dz              }t        j                         }	 	 |j                  |      S # t        j
                  $ r= | j                         s*|j                         rt        d| j                         cY S Y nw xY w|+t        j                         |z
  }||kD  rt        d| d      S )zGet result from the completion_queue associated with process.

    When the process finished without putting a result or the timeout expired an exception instance will be returnedx   
      r@  zExited with zProcess timed out out after s)
maxmintimer   queueEmptyis_aliveemptyRuntimeErrorexitcode)r  r  r@  queue_timeout
start_timeelapseds         r5   %retrieve_result_from_completion_queuer    s     #?CBCA8N0OMJ
	G#'''>>{{ 	G ##%*:*@*@*B#l73C3C2D$EFF	G iikJ.G #&B7)1$MNN s   A ABBr  r   c            	       6    e Zd ZdZdZdefdZedefd       Zede	fd       Z
d Z	 dded	edd
f fdZd fdZd fdZdefdZddZddZ G d de      Zede	fd       Zede	dededd
fd       Zdedd
fdZddZddZddZedefd       Z xZS )MultiProcessTestCaser   r  r   c                      yr   r3   selfs    r5   _should_stop_test_suitez,MultiProcessTestCase._should_stop_test_suite  s    r4   c                      y)NTr3   r  s    r5   destroy_pg_upon_exitz)MultiProcessTestCase.destroy_pg_upon_exit$  s    r4   c                     t         S r`   DEFAULT_WORLD_SIZEr  s    r5   rq   zMultiProcessTestCase.world_size(      !!r4   c                 V    t              fd       }t        j                  ||       S )Nc                 j    | j                   | j                  k(  r| j                         y          y r`   )r`  MAIN_PROCESS_RANK_join_processesr  r   s    r5   rs   z1MultiProcessTestCase.join_or_run.<locals>.wrapper-  s(    yyD222$$R(r4   r
   types
MethodTyper  r   rs   s    ` r5   join_or_runz MultiProcessTestCase.join_or_run,  .    	r	 
	 ..r4   method_name
methodNameNc                     |dk7  r|}t         |   |       	 t        | |      }t        | || j	                  |             y # t
        $ r+}|dk7  rt        d| j                   d|       |Y d }~y d }~ww xY wNrunTestzno such test method in : super__init__r   setattrr  AttributeError
ValueError	__class__r  r  r  r   er  s        r5   r  zMultiProcessTestCase.__init__:      
 "$K%		{+BD+t'7'7';< 	Y& !-dnn-=R
|L '	   (A 	A6!A11A6c                    t         |           i | _        g | _        g | _        | j
                  | _        t        j                  d      5 }|j                  | _
        d d d        i | _        y # 1 sw Y   i | _        y xY w)NFdelete)r  setUpspecial_return_code_checksskip_return_code_checks	processesr  r`  r  NamedTemporaryFiler  	file_namepid_to_pipe)r  r   r  s     r5   r  zMultiProcessTestCase.setUpM  ss     13' .0$**	((6 	$!VVDN	$ 	$ s   A..A>c                 r    t         |           | j                  D ]  }|j                           g | _        y r`   )r  tearDownr  	terminate)r  pr  s     r5   r  zMultiProcessTestCase.tearDown_  s3     	AKKM	 r4   c                 F    | j                         j                  d      d   S rS  idr   r  s    r5   r   z'MultiProcessTestCase._current_test_namei  s    wwys#B''r4   c                    g | _         t        t        | j                              D ]  }t        j
                  j                         \  }} || j                  j                  dt        |      z   || j                         | j                  |fdt        | dd      i      }|j                          t        j                  d||j                          || j"                  |j                   <   | j                   j%                  |        y )Nprocess fake_pgF)targetr  ro   rp   Started process %s with pid %s)r  rh  r0   rq   rm   multiprocessingPiper  _runr2   r   r  r   startloggerinfopidr  rk  )r  procr`  parent_conn
child_connr  s         r5   _start_processesz%MultiProcessTestCase._start_processesm  s    #doo./ 	+D&+&;&;&@&@&B#K~~**#d)+++-NN	 wtY>G MMOKK8$L,7DW[[)NN!!'*%	+r4   c                     	 t         j                  j                  d       t         j                  j	                  d      j
                  }| j                  |       y # t        $ r Y Fw xY w)Nspawn)rm   r  set_start_methodr  get_contextProcessr  )r  r  s     r5   _spawn_processesz%MultiProcessTestCase._spawn_processes  s[    	!!227; $$009AAd#	  		s   A 	A('A(c                       e Zd ZdZy)MultiProcessTestCase.Eventr   N)r-   r.   r/   GET_TRACEBACKr3   r4   r5   Eventr    s    r4   r  r`  c                    t         j                  d|       	 t        j                  j	                  | |g      }| |v r| j
                  rt         j                  d|       y | j                         }t         j                  d||       |t        j                  j                  k(  rt        j                  d      5 }t        j                  |       |j                          |j!                  d       | j#                  |j%                                t         j                  d|       d d d        ||v ry # 1 sw Y   xY w)Nz*Starting event listener thread for rank %sz:Pipe closed for process %s, stopping event listener threadzReceived event %s on process %szr+)moder   zProcess %s sent traceback)r  debugr  
connectionwaitclosedrecvr  r  r  r  r  r  faulthandlerdump_tracebackflushseeksendread)parent_pipesignal_piper`  ready_pipeseventtmp_files         r5   _event_listenerz$MultiProcessTestCase._event_listener  s   A4H)4499;:TUKk)%%LLT #((*=udK066DDD!44$? G8$33H= ( a(#((9$?FG k)5  G Gs   :A,D55D>r   r  c                 T     | |      }||_         ||_        |j                  ||       y r`   )r`  r  run_testclsr`  r   r  r	  rp   r  s          r5   r  zMultiProcessTestCase._run  s)     9~	"i-r4   c                 :   t         j                  j                  d      \  }}t        j                  t
        j                  ||| j                  fd      }|j                          t        j                  dk7  r2t        j                  dk7  rt         j                  j                  d       dt        j                  d<   t        j                           	  t#        | |              ||j=                  d        |t?        d      |jA                          |jC                          | jD                  r	 tG        jH                          y y # t$        j&                  $ rR}t(        j+                  d	| j                  ||       t        j,                  t.        d
   j0                         Y d }~d }~wt2        $ r t(        j5                  dt7        j8                         | j                  t
        j:                         |j=                  t7        j8                                t        j,                  t
        j:                         Y Zw xY w# ||j=                  d        |t?        d      |jA                          |jC                          w xY w# t>        tJ        f$ r Y y w xY w)NF)duplexT)r  ro   daemonr+  darwinr   TORCH_SHOW_CPP_STACKTRACESz4Process %s skipping test %s for following reason: %srK   z;Caught exception: 
%s exiting process %s with exit code: %sz-Expected event_listener_thread to not be None)&rm   r  r  	threadingThreadr  r  r`  r  rh   r,  _C'_set_print_stack_traces_on_fatal_signalrk   rl   r   set_rng_seedr   r   SkipTestr  r  ri   rj   r+   r   r   	traceback
format_excTEST_ERROR_EXIT_CODEr  r   r  closer  r   destroy_process_groupr  )r  r   r	  signal_recv_pipesignal_send_pipeevent_listener_threadses          r5   r  zMultiProcessTestCase.run_test  s!   -2-B-B-G-Gu-G-U** ) 0 0'77/;!

 	##%<<7"s||x'? HH<<TB36

/0!!#	 $GD)$&(  + %%d+$,$%TUU!&&($$ **,	 %9    	6KKF			 HHZ	*4455 		@LLN$$&		$99	 Y1134HH)>>?		@  + %%d+$,$%TUU!&&( #J/ sK   E 2J I AF(#I (BI <I ?I  I AJJJc                    g }t        | j                        D ]h  \  }}|j                  | j                  |j                     }	 |j                  t        j                  j                         |j                  ||f       j |D ]x  \  }}	 |j                  d      rK|j                  rt        j                  d|       ;|j!                         }t        j#                  d||       nt        j#                  d|       z y # t        $ r t        j                  d|       Y w xY w# t        $ r t        j                  d|       Y w xY w)Nz>Encountered error while trying to get traceback for process %sr5  z5Pipe closed for process %s, cannot retrieve tracebackz)Process %s timed out with traceback: 

%sz6Could not retrieve traceback for timed out process: %s)	enumerater  r  r  r  r  r  r  r  rk  ConnectionErrorr  	exceptionpollr  r  r  r   )r  pipesrx  r  piper`  r  s          r5   _get_timedout_process_tracebackz4MultiProcessTestCase._get_timedout_process_traceback  s0   #DNN3 
	JAw'''4II288FFGLL!T+
	   	JD$99Q<{{S  ! $		ILLEtY LLPRV!	 ' $$X4 #   Ts*   <D3D' >D'D$#D$'E	E	c                    t        | j                               }t        j                         }d}	 	 t        | j                        D ]w  \  }}|j
                  t        j                  k(  s$t        d| d|j
                   d       t        j                  j                         }|D ]  }|j                           d} n |rnt        d | j                  D              rntt        j                         |z
  }	|	|kD  rA| j                          t        d| d       | j                  D ]  }|j                           nt        j                  d	       #t        j                         |z
  }
| j!                  ||
       | j"                  j%                         D ]  }|j'                           y # | j"                  j%                         D ]  }|j'                           w xY w)
NFTProcess z terminated with exit code z", terminating remaining processes.c              3   8   K   | ]  }|j                   d u  y wr`   )r  )r   r  s     r5   r  z7MultiProcessTestCase._join_processes.<locals>.<genexpr>.  s     F!qzz-Fs   zTiming out after z" seconds and killing subprocesses.g?)rW  r  r  r(  r  r  r  r   printrm   r  active_childrenr  allr.  sleep_check_return_codesr  rq  r!  )r  r   r@  r  subprocess_errorrx  r  r3  acr  elapsed_timer-  s               r5   r  z$MultiProcessTestCase._join_processes  s   dggi(YY[
 &	%dnn5 DAq zz%9%N%NN&qc)DQZZLPrs +0*?*?*O*O*Q"1 +BLLN++/( $Ft~~FF))+
2W$88:+G94VW "^^ &&

3= @  99;3L$$R6 ((//1 

((//1 

s   9F. +DF. .1Gc           
         | j                   st        j                  d       y| j                   d   }t        | j                         D cg c]&  \  }}|j                  t
        j                  k(  r||f( }}}|r[d}|D ]I  \  }}| j                  |j                     j                         }	|d| dt
        j                   d|	 dz  }K t        |      t        | j                         D ]#  \  }}|j                  t        d| d	| d
       || j                  v ryt        j                         D ]q  }
|j                  |
j                  k(  st        r1t        j!                  d| j#                         |
j$                          yt'        j(                  |
j$                         d}|| j*                  v r| j*                  |   }| j-                  |j                  |d| d|j                   d|j                          yc c}}w )z
        Checks that the return codes of all spawned processes match, and skips
        tests if they returned a return code indicating a skipping condition.
        z<Note: no subprocesses were spawned, test was likely skipped.Nr    r0  z exited with error code z and exception:

 terminated or timed out after  seconds6Skipping %s on sandcastle for the following reason: %szExpected exit code z	 but got z
 for pid: )r   )r  r  warningr(  r  r  r   r  r  r  r  r  rj   rq  r+   r   r  r  r,   r   r  r  assertEqual)r  r   r9  first_processrx  r  errored_processesr   r  error_messageskipexpected_return_codes               r5   r6  z(MultiProcessTestCase._check_return_codesD  s#    ~~NNN q) "$..1
1zz1FFF F
 

 E/ 
7 $ 0 0 = B B Dqc!9:N:c:c9d e''4oR9 u%% dnn- 	DAqzz!"qc!@hW 	 ---%%' 	:D%%7 
 KKP	
 "++DLL99	:"  ! 000#'#B#B2#F "" %&:%;9]E[E[D\\fgtgxgxfyz 	 	
g
s   
+Hc                      | j                   dk(  S )Nr   r`  r  s    r5   r?  zMultiProcessTestCase.is_master  s    yyA~r4   r  r  r   N)r-   r.   r/   r  r   boolr  propertyr  r0   rq   r  r2   r  r  r  r   r  r  r   r  staticmethodr  classmethodr  r  r.  r  r6  r?  __classcell__r  s   @r5   r  r    s8   
   d   "C " "/ ?H8;	&$(C (+,$    < ..#&.36.	. .7# 7t 7r&P*XJ
X 4  r4   r  c                   >     e Zd Z fdZd ZdefdZddZd Z xZ	S )DistributedTestBasec                     t         |           t        | j                        t        j
                  d<   | j                          y )Nrf   )r  r  r2   rq   rk   rl   r  r  r  s    r5   r  zDistributedTestBase.setUp  s/    #&t#7

< r4   c                     	 t         j                  j                          	 t	        j
                  | j                         y # t        $ r Y ,w xY w# t        $ r Y y w xY wr`   )rm   distributedr"  r   rk   remover  OSErrorr  s    r5   r  zDistributedTestBase.tearDown  sU    	335	IIdnn%  		  		s"   A A 	AA	AAr   c                 "    d|v ryd|v ryd|v ryy)Nr'   r$   r[   r&   r(   r%   rV   r3   )r  rb   s     r5   r   zDistributedTestBase.backend  s$    Vf_f_r4   c                    || j                   }t        j                  |      j                         }t        j                  j                  | j                  |      }t        j                  j                  | j                  |      || j                  |       d| j                  |      v sd| j                  |      v r)t        j                  j                  | j                         t        j                  j                  j                         S )Nr   rq   r`  storer$   r%   )rq   rm   get_device_modulern   rV  	FileStorer  init_process_groupr   r`  acceleratorset_device_indexdistributed_c10d_get_default_group)r  rb   rq   num_visible_devicesr\  s        r5   	create_pgzDistributedTestBase.create_pg  s    J#55f=JJL!!++DNN<OP,,LL(!	 	- 	
 T\\&))Vt||F7K-K..tyy9  11DDFFr4   c                     t        j                  |      j                         }t        | j                        D ci c]	  }|||z  g c}S c c}w r`   )rm   r]  rn   rh  rq   )r  rb   rd  rx  s       r5   rank_to_devicez"DistributedTestBase.rank_to_device  sG    #55f=JJL6;DOO6LMA++,,MMMs   Ar`   )
r-   r.   r/   r  r  r2   r   re  rg  rO  rP  s   @r5   rR  rR    s%     
 GNr4   rR  subtest_configtest_fntest_kwargsc                    t        |j                               }|D cg c]  }|d   	 }}|D cg c]  }|d   	 }}t        j                  | D ]  }	t	        t        ||	d            }
 | j                  di |
5  t        j                  j                           ||i ||
 t        j                  j                          ddd       t        j                           yc c}w c c}w # 1 sw Y   *xY w)a\  
    Runs a test function given by ``test_fn`` as a subtest according to the
    configurations specified by ``subtest_config``. This amortizes the
    costly setup overhead (including process spawn and initializing the
    process group) over the subtests.

    Args:
        subtest_config (Dict[str, List[Any]]): A mapping from subtest
            keyword argument name to a list of its possible values.
        test_fn (Callable): A callable that runs the actual test.
        test_args: Positional arguments to pass to ``test_fn``.
        test_kwargs: Keyword arguments to pass to ``test_fn``.
    r   r   T)strictNr3   )r{  items	itertoolsproductdictzipsubTestrm   _dynamoresetr   r   )cls_instrh  ri  	test_argsrj  subtest_config_itemsitemsubtest_config_keyssubtest_config_valuesrq  subtest_kwargss              r5   run_subtestsr|    s    * 9=^=Q=Q=S8T:N%O$d1g%O%OBV-W$d1g-W-W##%:; c"5vdKLX// 	"MM!Y@+@@MM!	" 	 &P-W	" 	"s   C"C'<AC,,C5	c                  n    	 t        j                  g dd      j                  dk(  S # t        $ r Y yw xY w)a   
    If shell command `fi_info -p efa -t FI_EP_RDM` returns exit code 0 then we assume that the machine has
    Libfabric EFA interfaces and EFA software components installed,
    see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start.html.
    )fi_infoz-pefaz-t	FI_EP_RDMF)checkr   )
subprocessrun
returncodeFileNotFoundErrorr3   r4   r5   has_efar    sA    NN;5j	
  s   %( 	44c                  "    t               rddgS dS )a  
    If the machine has Libfabric EFA interfaces and EFA software components installed it may cause
    'RuntimeError: In operator() at tensorpipe/common/ibv.h:172 "": Operation not supported' if tensorpipe
    uses InfiniBand transport, so we exclude it from tensorpipe transports,
    see https://github.com/pytorch/pytorch/issues/73885 and https://github.com/pytorch/pytorch/issues/65022
    shmuvN)r  r3   r4   r5   tp_transportsr    s     $IE4=/4/r4   c                 d      t        t        |      S d t                fd       }|S )z+
    Wrapper to use with a test method
    )r@  rq   c                      t               t        j                         }fd fd}g }t               D ]=  }t	        j
                  |||f      }|j                          |j                  |       ? |S )Nc                  >     t         j                  j                  k(  S r`   r   rb  _worldworlds   r5   world_is_validzaspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.world_is_valid      D118888r4   c                 ~   t        j                  d| |       	                  rt        j                          y y # t        $ rR}t        j                  j                  | t        j                         f       t        j                  |       Y d }~sd }~ww xY w#         rt        j                          w w xY w)Nthreadedr   r`  rq   r\  )r   r_  BaseExceptionMultiThreadedTestCaseexception_queueputrh   exc_infor"   exception_handler"  )r`  world_pgr\  excallbackr  rq   s       r5   workerzYspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.worker  s    ##"*E
1
 "#..0 $ ! %55994:PQ!22  "#..0 $s*   A   	B	ABB BB B<r  ro   )r    r   	HashStorerh  r  r  r  rk  )	rq   r  global_storer  threadsr`  tr  r  s	   ``     @@r5   #_run_test_method_with_multi_threadszIspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads  sq    $&~~'	9	1  *% 	D  dE<5PQAGGINN1	
 r4   c                 X    t         j                  j                  j                  d       	   fd      }t        j                  |       t         j                  j                  j                  d       y # t         j                  j                  j                  d       w xY w)NTc                       g i S r`   r3   )ro   rr   rp   r  s   r5   r   z?spawn_threads_and_init_comms.<locals>.wrapper.<locals>.<lambda>8  s    D$?$?$? r4   F)rm   r  _distributed_c10d_set_thread_isolation_moder  _join_threads)r  ro   rp   r  r  rr   rq   s   ``` r5   rs   z-spawn_threads_and_init_comms.<locals>.wrapper2  sv     	""==dC	I9?G "//>HH&&AA%HEHH&&AA%Hs   %A> >+B))r   spawn_threads_and_init_commsr
   )rr   r@  rq   rs   r  s   ` ` @r5   r  r    sD     |('j
 	
> 4[
I 
I Nr4   c                       e Zd ZdZ ej
                         ZdZd Z	 dde	de	ddf fdZ
d	 Zd
 Zd fdZ fdZd Zed        Zd Zed        Zed        Zedefd       Zede	fd       ZddddZddddZ xZS )r  a5  
    Test runner that runs all tests with the in-proc process group using
    multiple threads with the threaded process group.

    Each test spawns world_size threads and run the test method in each thread.

    Difference from regular MultiProcess test runner:
    Must explicitly defines SetUp and call self._spawn_threads() to run the tests.
    Cannot use setUp / tearDown (must use perThreadSetup / perThreadShutdown)
        to set up / tear down each thread when running each test.
    No global state possible
        How bad of a limitation is this?
    r   c                 V    t              fd       }t        j                  ||       S )Nc                     | j                   | j                  k(  r| j                  | j                         y          y r`   )r`  MAIN_THREAD_RANKr  r  r  s    r5   rs   z2MultiThreadedTestCase.join_or_run.<locals>.wrapperV  s.    yyD111""4<<4r4   r  r  s    ` r5   r  z!MultiThreadedTestCase.join_or_runU  r  r4   r  r  r   Nc                     |dk7  r|}t         |   |       	 t        | |      }t        | || j	                  |             y # t
        $ r+}|dk7  rt        d| j                   d|       |Y d }~y d }~ww xY wr  r  r  s        r5   r  zMultiThreadedTestCase.__init___  r  r  c                      y r`   r3   r  s    r5   perThreadSetUpz$MultiThreadedTestCase.perThreadSetUpr  s    r4   c                      y r`   r3   r  s    r5   perThreadTearDownz'MultiThreadedTestCase.perThreadTearDownv  s    r4   c                 x    t         |           | j                  | _        g | _        dt
        j                  d<   y)z
        setUp only set up things in the main thread, if you want to configure things
        in the spawned threads, use perThreadSetUp
        r   r  N)r  r  r  r`  r  rk   rl   rT  s    r5   r  zMultiThreadedTestCase.setUpy  s1    
 	))	36

/0r4   c                 0    t         |           g | _        y)z
        tearDown only set up things in the main thread, if you want to configure things
        in the spawned threads, use perThreadTearDown
        N)r  r  r  rT  s    r5   r  zMultiThreadedTestCase.tearDown  s    
 	r4   c                    t         j                  j                  j                  d       | j                  }t               t        j                         | j                  _	        fd} |       st        d      t        | j                        D ]e  }t        j                  | j                  j                  ||| j                  f      }|j!                          | j"                  j%                  |       g y)zk
        class method to spawn threads and run test, use this method in the SetUp of your TestCase
        Tc                  >     t         j                  j                  k(  S r`   r  r  s   r5   r  z<MultiThreadedTestCase._spawn_threads.<locals>.world_is_valid  r  r4   zInvalid worldr  N)rm   r  r  r  r   r    r   r  r  r  r  rh  rq   r  r  r  r  r  rk  )r  r   r  r`  r  r  s        @r5   _spawn_threadsz$MultiThreadedTestCase._spawn_threads  s     	""==dC++	$&&*nn&6#	9 //$//* 	#D  ~~**)T4??1SA GGILL"	#r4   c                     | |      }||_         t        |d      rWt        j                         |_        t
        j                  |j                  _        t
        j                  |j                  _	        |j                  |||       y )N_tls)r`  r   r  localr  r   
_precision	precision_rel_tolrel_tolrun_test_with_threaded_pg)r  r   r`  rq   rp   r  s         r5   r  zMultiThreadedTestCase._run  sb    9~	 4 !)DI"*"5"5DII ( 1 1DII&&y$
Cr4   c                    t        j                  d||| j                  j                         | j	                          	  t        | |              t        j                          | j                          y# t        $ rN}| j                  j                  |t        j                         f       t        j                  |       Y d}~wd}~ww xY w# t        j                          | j                          w xY w)zd
        Run the current test associated with `test_name` using the threaded process group.
        r  r  N)r   r_  r  r  r  r   r  r  r  rh   r  r"   r  r"  r  )r  r   r`  rq   r  s        r5   r  z/MultiThreadedTestCase.run_test_with_threaded_pg  s     	!..--		
 			%$GD)$& &&(""$  	  $$dCLLN%;<.. 	 &&(""$s*   A5 5	C>ACC CC &C5c           
         t         }	 t        |      D ]f  \  }}|j                  t        d|             |j	                         s2t
        j                  j                  |t        t        d| d      d ff       h t        j                          g }| j                  j                         sF| j                  j                         }|j                  |       | j                  j                         sFt                t        j                   j"                  j%                  d       | j'                  |||       y # t                t        j                   j"                  j%                  d       w xY w)Nr   zRank failed to join in under r>  F)rU  r(  r  r  r  r  r  r  TimeoutErrorr"   rt  r  r   rk  r!   rm   r  r  r  r6  )r  r  r   r@  idxthreadfailed_ranksfailures           r5   r  z#MultiThreadedTestCase._join_threads  s-   !	I(1 VC7O,??$)99== , ,&CG9H$U!" !%	 ##%L))//1--113##G, ))//1 #$HH&&AA%Hgr: #$HH&&AA%Hs   <D9 B,D9 95E.c                    d}d}|D ]&  \  }}|d   }t        |t        j                        r2t        j	                  d|||       |dk  sDt
        d   j                  }Xt        |t              r)d| d| d	}	t        j                  |	       t        |	      t        |t              rEdj                  t        j                  |       }	t        j                  d
|	|       |d| d|	 dz  }t        |t              st        |j                         t"        u s|dk  s|j                   }) t%        |      dkD  rt        |      |dkD  rqt
        j'                         D ]Y  }
||
j                  k(  st(        r#t        j	                  d||
j*                          y t        j                  |
j*                         y y )Nr;  r   r   z3Thread %s skipping test %s for following reason: %sr   rK   zThread r=  z	 seconds
z'Caught exception: 
%s exiting thread %sz exited with exception:
r<  r?  )
isinstancer   r  r  r  rj   r+   r  r   r  r   r  r  format_exception
SystemExitr1  coder0   r   rq  r   r,   )r  r  r@  r   	error_msg	skip_coder`  r  excr   rE  s              r5   r6  z)MultiThreadedTestCase._check_return_codes  s    		* 	)ND(1+C#x001I	 q= *9 5 ? ?IC.v%DWIZXS!"3''C+ggi88(CDGdSwtf,EcU"MM	C,>S(Y] #I+	)0 y>Ay))q="))+ >.$T LL
 &//==> r4   c                     t         S r`   r  r  s    r5   rq   z MultiThreadedTestCase.world_size  r  r4   c                 F    | j                         j                  d      d   S rS  r  r  s    r5   r   z(MultiThreadedTestCase._current_test_name  s     wwys#B''r4   r   rH  c                J    | j                   |k(  r| j                  |||       yy)z
        The reason why we have this util function instead of
        self.assertEqual is all threads are sharing one CPU RNG
        so the assertion result is only reliable on rank 0
        N)r`  rA  r  r   yr   r`  s        r5   assertEqualOnRankz'MultiThreadedTestCase.assertEqualOnRank#  s'     99Q3' r4   c                H    | j                   |k(  r| j                  ||       y y r`   )r`  assertNotEqualr  s        r5   assertNotEqualOnRankz*MultiThreadedTestCase.assertNotEqualOnRank,  s#    991% r4   rI  rJ  r`   )r-   r.   r/   __doc__r  Queuer  r  r  r2   r  r  r  r  r  r  rN  r  r  r  r6  rL  r0   rq   r   r  r  rO  rP  s   @r5   r  r  B  s     "ekkmO/ ?H8;	&	7#. D D%. ; ;: /> />b "C " " (C ( (( (&1 & &r4   r  c                        e Zd Zdeej
                  ej                  f   deddf fdZ	dej                  dej                  fdZ
 xZS )SaveForwardInputsModuleforward_inputscast_forward_inputsr   Nc                 t    t         |           t        j                  dd      | _        || _        || _        y )Nd   )r  r  nnLinearlr  r  r  r  r  r  s      r5   r  z SaveForwardInputsModule.__init__2  s2    
 	3$,#6 r4   r   c                     || j                   | <   | j                  | j                  r3|j                  | j                  j                  j
                              S |      S r`   )r  r  r  toweightdtyper  r   s     r5   forwardzSaveForwardInputsModule.forward<  sI    $%D!vv43K3Kadd466==../SSQRSSr4   r-   r.   r/   rp  r  Modulerm   TensorrK  r  r  rO  rP  s   @r5   r  r  1  sT    7RYY457 "7 
	7T T%,, Tr4   r  c                        e Zd Zdeej
                  ej                  f   deddf fdZ	dej                  dej                  fdZ
 xZS )SaveForwardInputsModelr  r  r   Nc                 t    t         |           t        ||      | _        t        ||      | _        || _        y r`   )r  r  r  c1c2r  r  s      r5   r  zSaveForwardInputsModel.__init__B  s6    
 	).:MN).:MN,r4   r   c                 `    || j                   | <   | j                  | j                  |            S r`   )r  r  r  r  s     r5   r  zSaveForwardInputsModel.forwardL  s)    $%D!wwtwwqz""r4   r  rP  s   @r5   r  r  A  sQ    -RYY45- "- 
	-# #%,, #r4   r  c              #   X  K   |st         j                  j                  |        t         j                  j                         x}r|j                  nd}|t        j                  |      }dt        j                  d<   dt        j                  d<   |rp|rVt         j                  j                  j                  j                  j                         }t        j                  d|| |       nt        j                  || |       t         j                  j!                          t         j                  j"                  j$                  j'                          	 d  t         j                  j!                          t         j                  j"                  j$                  j'                          |rt        j(                          y y # t         j                  j!                          t         j                  j"                  j$                  j'                          |rt        j(                          w w xY ww)	Ncpur4  MASTER_ADDR6789MASTER_PORTfaker[  )r   r`  rq   )rm   r`  ra  current_acceleratorr1  r   get_default_backend_for_devicerk   rl   testing	_internalrV  r  	FakeStorer_  rs  rt  utilscountersclearr"  )r`  rq   r   init_pgr  accdevice_typer\  s           r5   _dynamo_dist_per_rank_initr  Q  s     **40 "--AACCSC%  55kB +BJJ} &BJJ}MM++77??IIKE##%	 ##G$:V	MM	MM  &&()$$**,&&(  	$$**,&&( s    EH*F> A(H*>A)H''H*c                   @     e Zd ZdZe fd       Ze fd       Z xZS )#DynamoDistributedSingleProcTestCasez
    Test harness for single-process dynamo distributed tests,
    initializes dist process group.

    Prefer this for simple tests, as it's easier to debug.
    c                    t         |           | j                  j                  t	        j
                  t        j                  ddd             d| _        t        j                  j                         j                  }| d| j                   | _        || j                  v rd n| j                  g| _        t        j                   t        j"                  |      | j                  d       y )Nr4  12355)r  r  r   r  r   )r`  rq   )r  
setUpClass_exit_stackenter_contextr   rp  rk   rl   r`  rm   r`  r  r1  rb   
device_idsr   r_  r  )r  rb   r  s     r5   r  z.DynamoDistributedSingleProcTestCase.setUpClass  s    %%JJ

#.#*	
 ""668==xq
+
!'3::!5CHH://7chhST	
r4   c                 J    t        j                          t        |           y r`   )r   r"  r  tearDownClassr  r  s    r5   r  z1DynamoDistributedSingleProcTestCase.tearDownClass  s    ""$r4   )r-   r.   r/   r  rN  r  r  rO  rP  s   @r5   r	  r	  x  s0     
 
(    r4   r	  c            	       H    e Zd ZdZedefd       Zededededdfd       Z	y)	"DynamoDistributedMultiProcTestCasea   
    Use this for tests that actually run on multiple GPUs.

    Decorate tests with @skip_if_lt_x_gpu(ngpu)

    Note: MultiProcTestCase spawns processes per test and is slow.
    Prefer MultiThreadedTestCase for most tests. Perhaps use this one
    sparingly for integration tests.
    r   c                 >    t         j                  j                         S r`   )rm   r`  rn   r  s    r5   rq   z-DynamoDistributedMultiProcTestCase.world_size  s      --//r4   r`  r   r  Nc                     t        j                  t        j                                 | |      }||_        ||_        |j                  ||       y r`   )r   
addHandlerloggingNullHandlerr`  r  r  r  s          r5   r  z'DynamoDistributedMultiProcTestCase._run  sB     	W0023 9~	"i-r4   )
r-   r.   r/   r  rL  r0   rq   rN  r2   r  r3   r4   r5   r  r    sV     0C 0 0 	.	.#&	.36	.		. 	.r4   r  c                       e Zd ZU dZdZeed<   dZeed<   dZe	dz  ed<    e
d      Ze
ed	<   d
Zeed<   d
Zeed<   ede	dz  fd       Zede	fd       Zed d       Zed        Zede	ddfd       Zed        Zed!d       Zede	defd       Ze fd       Zed        Ze fd       Zd! fdZd Z	 d"de	de	ddf fdZ xZS )#MultiProcContinuousTestr   rq   r`  N	rdvz_filer  )secondsr@  Fpoison_pill_processes_spawnedr   c                      y)z
        ProcessGroup backend str.
        To be customized by sub test classes, e.g. "nccl".
        Otherwise we return None -- lazily decided by tensor.
        Nr3   )r  s    r5   backend_strz#MultiProcContinuousTest.backend_str       r4   c                 \    t         j                  j                         }|y|j                  S )Nr  )rm   r`  r  r1  )r  curr_devices     r5   r  z#MultiProcContinuousTest.device_type  s+    '';;=r4   c                      y)z
        ProcessGroup init options.
        To be customized by sub test classes, e.g. ProcessGroupNCCLOpTest
        Here we return None.
        Nr3   )r  high_priority_streams     r5   optszMultiProcContinuousTest.opts  r#  r4   c                 J   |t        d      t        |      t        j                  d<   t	        j
                  ||      }t	        j                  | j                         |||| j                         | j                         t        j                  j                         | _        y )Nz!Expected rdvz_file to not be None
LOCAL_RANK)r   rq   r`  r\  
pg_optionsr@  )r   r2   rk   rl   r   r^  r_  r"  r(  r@  rb  rc  pg)r  r`  rq   r  r\  s        r5   _init_pgz MultiProcContinuousTest._init_pg  s     !DEE $'t9

< y*5OO%!xxzKK	
 &&99;r4   rV  c                     |j                  dd      d   } | |      }| j                  |_        | j                  |_        t        ||      }t	        j
                           |di | y )Nr$  r   r!  r   r3   )rsplitr`  rq   r   r   r  )r  rV  rp   r   r  ri  s         r5   _run_test_given_idz*MultiProcContinuousTest._run_test_given_id  sa     NN3N3B7	9~HH	..$	* 	!!# 	&r4   c                    d}d|cxk  r|k  sn t        d| d|       || _        || _        d }	 | j                  |||       t        j                  d       	 |j                         }
t        j                  d	|
        |
nK|%|j                  t        j                  |             S	 | j!                  |
       |j                  |
       vt        j                  d       |st5        j6                          y y # t        $ rO}t        |dd       t        fdt        j                         D        d       }	|	r|	j                  }n Y d }~d }~ww xY w# t"        $ r}t%        |t              rjt        |dd       t        fd
t        j                         D        d       }	|	r4|j                  t        j                  |	j                               Y d }~d}t'        j(                         }dj+                  t-        j.                  |       }t1        d|       }||_        |j                  |       Y d }~od }~ww xY w)NFr   z*Expected 0 <= rank < world_size, got rank=z, world_size=r  c              3   B   K   | ]  }|j                   k(  s|  y wr`   r+   r   vr+   s     r5   r  z7MultiProcContinuousTest._worker_loop.<locals>.<genexpr>  s     Lq1;;)3KL   zSetup completeTz	Got test c              3   B   K   | ]  }|j                   k(  s|  y wr`   r3  r4  s     r5   r  z7MultiProcContinuousTest._worker_loop.<locals>.<genexpr>;  s     Tq1;;);STr6  r;  zException in worker process:
zTerminating ...)r   r`  rq   r-  r  r   nextrj   rq  r,   r  r  r   r  r   r  r0  r  r  rh   r  r  r  r  r  	__cause__r   r"  )r  r`  rq   r  
task_queuer  raised_exceptioninit_skip_reasonr  
skip_entryrV  r  tb_strenhanced_exr+   s                 @r5   _worker_loopz$MultiProcContinuousTest._worker_loop  s*    T&J& <TF-PZ|\  #  	LLz95 	%&  nn&GLL9WI./  + $$X%6%67G%HI2&&w/ $$W- V 	&'
  &&(    		FD1ILJ--/LJ #-#5#5  !		> ! 2b*- 'FD 9I "&TJ$5$5$7T"J "(,,X->->z?Q?Q-RS #' <<>!;!;X!FG*-KF8+TU(*% $$[1112s8   C? /"E ?	EAEE	I#A4I AI  Ic                 J   g | _         g | _        g | _        t        j                  d      5 }|j
                  | _        d d d        	 t        j                  j                  d       t        t        |            D ]	  }t        j                  j                         }t        j                  j                         }t        j                  j                  | j                  dt!        |      z   d||| j                  ||f      }|j#                          | j                   j%                  |       | j                  j%                  |       | j                  j%                  |       t&        j)                  d||j*                          y # 1 sw Y   LxY w# t        $ r Y ;w xY w)NFr  r  r  T)r  r  r  ro   r  )r  task_queuescompletion_queuesr  r  r  r  rm   r  r  r  rh  r0   r  r  r@  r2   r  rk  r  r  r  )r  rq   r   r`  r:  r  r  s          r5   r  z(MultiProcContinuousTest._spawn_processesW  sX    "((6 	#!FFCM	#
	!!227;
 #j/* 	ND..446J$44::<++33''#d)+JzCST	 4 G MMOMM  )OO"":.!!(()9:LL94M	N	# 	#  		s   FF F	F"!F"r  c                 F   t        j                  | dd      }t        |t              r't        j                  |       }|j                  |      }n| j                  }|dk(  rAt        j                  |      j                         }|dk(  rt        j                  d| d      |S )z
        Get world_size, handling both class variable and property definitions.
        Properties are instance-level and need special handling in class methods.
        rq   Nr  r   zNo z devices available)inspectgetattr_staticr  rL  object__new__fgetrq   rm   r]  rn   r   r  )r  r  world_size_attrtemp_instancerq   s        r5   _get_world_sizez'MultiProcContinuousTest._get_world_sizew  s     "00lDIox0 #NN3/M(--m<JJ 00=JJLJQ''#k]:L(MNNr4   c                 "    t         |           y)a  
        Class-scope test fixture. Run once for entire test class, before any test starts.
        Note: Process spawning is deferred to setUp to support instantiate_device_type_tests,
        which calls setUpClass during class creation before any tests run.
        N)r  r  r  s    r5   r  z"MultiProcContinuousTest.setUpClass  s     	r4   c                 :   | j                   ry| j                  j                  d| j                        }t	        |t
              r|j                  |       }n7t	        |t              r|j                  |       }nt        |      r |       }n|}| j                  |      | _        t        | j                        r| j                         n| j                  }|rt        j                  t        j                  t        j                   t        j"                  d}|j                  |      }|  |       st%        j&                  d| d      t(        j+                  d| j,                   d| j                   d|        | j/                  | j                         d	| _         y)
z
        Lazily spawn worker processes on first test run.
        This supports instantiate_device_type_tests which calls setUpClass during
        class creation (before any tests run), when spawning would be premature.
        Nr  )r$   rV   rS   r%   z	Backend 'z' is not availablezTesting class z on  T)r   __dict__r   r  r  rN  __func__rL  rI  r   rL  rq   r"  r   r   r   r   r   r   r  r  r  r-   r  )r  device_type_attrr  r   backend_checkscheck_fns         r5   _ensure_processes_spawnedz1MultiProcContinuousTest._ensure_processes_spawned  s]    !!
 <<++M3??K&4*33C8K((3 +//4K&'*,K*K ,,[9 (0'@#//#coo....,,..	N &))'2H#HJ'')G9<N(OPPS\\N$s~~.>a}M	
 	S^^,!%r4   c                    | j                   st        | 	          yt        j	                  d| j
                   d       | j                  D ]  }|j                  d        | j                  D ]  }|j                           	 t        j                  | j                         t        j                  d| j                   d       t        | 	          y# t        $ r Y =w xY w)z
        Class-scope test fixture. Run once for entire test class, after all tests finish.
        Tear down the process group if spawned.
        NzJoining z workerszClass z	 finished)r   r  r  r  r  rq   rB  r  r  r  rk   rW  r  rX  r  r-   )r  r:  r  r  s      r5   r  z%MultiProcContinuousTest.tearDownClass  s     %%G!#x/x89// 	!JNN4 	! }} 	GLLN		IIcmm$ 	fS\\N)45	  		s   C 	C! C!c                    t         |           | j                  j                          | j                  | _        | j                  j                  r&t        j                  d| j                                t        | j                        D ]M  \  }}t        j                  d| d| j                                 |j                  | j                                O y)z5
        Test fixture. Run before each test.
        zPrevious test failed, skipping zSending Rank r  N)r  r  r  rU  r  r`  r  r   r  r  r(  rB  r  r  r  )r  rx  r:  r  s      r5   r  zMultiProcContinuousTest.setUp  s     	 	002 **	 >>%%##&Edggi[$QRR 't'7'78 	&MAzLL=2dggi[9:NN4779%	&r4   c                 V    t              fd       }t        j                  ||       S )Nc           	         | j                   | j                  k(  rgt        j                  d| j	                                 d }t        t        | j                  | j                              D ]  \  }\  }}t        ||t        | j	                                     }|2t        |t        j                        r|}Ot        |t              rTt        j                  d| d| j	                          d| j                   j"                          d| j                   _        |}|| j	                         k7  rt'        d| d| j	                                t        j                  d	| d
| j	                                  ||y          y )NzWaiting for workers to finish r  zDetected failure from Rank z in: z(, skipping rest of tests in Test class: TzExpected rv == self.id(), got z != zMain proc detected rank z
 finished )r`  r  r  r  r  r(  rq  r  rC  r  rW  r  r   r  r  r@  r  r-   r  r   )r  deferred_exceptionrx  r  r  rvr   s         r5   rs   z>MultiProcContinuousTest._worker_run_main_wait.<locals>.wrapper  sz   yyD222=dggi[IJ &*"09(>(>?1 ,A,+ ?+[5KB *5 !"h&7&78-/* !"m49!E$'') MEEI^^E\E\D]_ 6:2-/*  TWWY,<RDTWWYKP  LL21#Z	{K5< &1,, 2 r4   r  r  s    ` r5   _worker_run_main_waitz-MultiProcContinuousTest._worker_run_main_wait  s/    	r(	 
(	T ..r4   r  r  c                     |dk7  r|}t         |   |       	 t        | |      }t        | || j	                  |             y # t
        $ r+}|dk7  rt        d| j                   d|       |Y d }~y d }~ww xY wr  )r  r  r   r  r\  r  r  r  r  s        r5   r  z MultiProcContinuousTest.__init__-  s    
 "$K%		{+BD+t'A'A"'EF 	Y& !-dnn-=R
|L '	r  )FrJ  rI  )r-   r.   r/   r  rq   r0   r1   r`  r  r2   r   r@  r  rK  r   rN  r"  r  r(  r-  r0  r@  r  rL  r  rU  r  r  r\  r  rO  rP  s   @r5   r  r    s   JD#N IsTz "3/GY/K$$C$J    C       < <&  4   O) O)b N N> # #  .   .& .&`    8&*,/f ?H8;	 r4   r  r`   r   )r   rJ  r   )r  	functoolsrE  rn  r  r  rt  rk   r  r  rh   r  r  r  r  r  r   collections.abcr   
contextlibr   dataclassesr   datetimer   enumr   r   r	   r
   ior   typingr   r   unittest.mockr   rm   torch._dynamo.test_casetorch.cuda.nccltorch.distributedrV  r   torch.nnr  torch._C._autogradr   torch._C._distributed_c10dr   torch._logging._internalr   torch.testing._internalr   $torch.testing._internal.common_utilsr   r   r   r   r   r   r   r   r   r   r   r   r   5torch.testing._internal.distributed.multi_threaded_pgr    r!   r"   	getLoggerr-   r  setLevelINFOr  ra   HAS_ACCELERATORr*   rj   rR   rc   rv   r}   r   r   r   r   rK  r   r   r0   r   r   r   r   r   r   r   r   r   r   r   r   r  r
  r  r  r1   r  r&  r2   r  r)  r-  rb   r3  rD  rU  getenvrT  rQ  rW  r_  ry  r  r  r  r  r  r  r  r  r  r  r'   rn   r  r  rR  rp  r{  r|  cacher  r  r  r  r  r  r  r  rs  	test_caser	  r  r  r3   r4   r5   <module>rx     s          	   
       $ % !   , ,  "        ) 7 . 0     
		8	$  4 E? 3x38z 
8
C x$FG	
 Xb"BC x45 8B => 8B >? 8B >? 8B >? 8B >? 8B >? 8B >? 8B >? HR>?  (267!" hr#LM#$ x
V%* 8B DE+, hr#BC-
4 * * **&6   &+ 83 , s s ("
,+\4
N: $+1$ D 
S
sCx *F Fc F# F$ F" 	a 
 
: O)"))$GOPO!$'  +.'(
T 
IC I 2 2(S (c (s (XS 3 2 /3	$	$t	+ 2
S4Z 
4 
" O""**O++11O 4ZO 		O6 QAuzz'>'>'@ AB8 N+N. +N\d3i( 
 D   &0 
3E7tl&H l&^Tbii T #RYY #  :?#) #)L  %--*A*A*J*J   F.)< .8Gh Gr4   