
    rh}                        U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZ d dlmZ d dlmZmZm Z m!Z!m"Z" d d	l#m$Z$ d dl%Z%d dl&Z%d dl'Z%d dl(m)Z* d dl+m,Z, d d
l-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z=m>Z>m?Z? d dl@mAZAmBZBmCZC  ej                  eE      ZFeFj                  ej                         g dZIddgZJe:xs e;xs e>ZK G d de       ZLi d eLdd      d eLdd      d eLdd      d eLdd      d  eLd!d"      d# eLd$d%      d& eLd'd(      d) eLd*d+      d, eLd-d.      d/ eLd0d1      d2 eLd3d4      d5 eLd6d7      d8 eLd9d:      d; eLd<d=      d> eLd?d@      dA eLdBdC      dD eLdEdF      dG eLdHdI      iZMe G dJ dK             ZNdL ZOdM ZPdN ZQdO ZRdP ZSdQ ZTdR ZUdS ZVdT ZWdU ZXdV ZYdW ZZdX Z[dY Z\dZ Z]d[ Z^d\ Z_dd]Z`d^ Zad_ Zbd` Zcdae%j                  dbeedceeddeffdeZge7dfdgdh edij      dhdkdhfdl       Zhe=rdmZin ee ej                  dndo            ZidpdqiZke<rdrekds<   ddteffduZlddeefdvZmedw        Znddxeedyeedzeefd{Zodyeed|epfd}Zqdare!e	j                     etd~<   dde!ep   dddfdZuddZvdZw G d de?      Zx G d dex      Zydezepe{e   f   dedefdZ|da}ddeffdZ~d ZdeiewfdZ G d de?      Z G d de,j                        Z G d de,j                        Ze	 dd       Z G d de%j                  j                  j~                        Z G d dey      Z G d de?      Zy)    N)contextmanager)	dataclass)	timedelta)Enum)partialreducewraps)StringIO)AnyCallable
NamedTupleOptionalUnion)patch)
DeviceType)_SymmetricMemory)	trace_log)FILE_SCHEMAfind_free_portIS_SANDCASTLEretry_on_connect_failuresskip_but_pass_in_sandcastleskip_but_pass_in_sandcastle_if	TEST_CUDATEST_HPUTEST_WITH_ROCMTEST_WITH_TSANTEST_XPUTestCase)_install_threaded_pg_uninstall_threaded_pgProcessLocalGroupncclxcclhcclcudaxpuc                   "    e Zd ZU eed<   eed<   y)TestSkip	exit_codemessageN)__name__
__module____qualname__int__annotations__str     }/var/www/html/ai-insurance-compliance-backend/venv/lib/python3.12/site-packages/torch/testing/_internal/common_distributed.pyr*   r*   @   s    NLr4   r*   backend_unavailableH   z5Skipped because distributed backend is not available.small_worldsizeI   z Skipped due to small world size.odd_worldsizeW   zSkipped due to odd world size.no_cudaJ   zCUDA is not available.zmulti-gpu-1K   zNeed at least 1 CUDA devicezmulti-gpu-2M   zNeed at least 2 CUDA deviceszmulti-gpu-3P   zNeed at least 3 CUDA deviceszmulti-gpu-4Q   zNeed at least 4 CUDA deviceszmulti-gpu-5R   zNeed at least 5 CUDA deviceszmulti-gpu-6S   zNeed at least 6 CUDA deviceszmulti-gpu-7T   zNeed at least 7 CUDA deviceszmulti-gpu-8U   zNeed at least 8 CUDA devicesr$   L   z#c10d not compiled with NCCL support
skipIfRocmN   zTest skipped for ROCmno_peer_accessO   z'Test skipped because no GPU peer accessgenericV   zHTest skipped at subprocess level, look at subprocess log for skip reasonimporterrorX   z"Test skipped due to missing importno_acceleratorY   zaccelerator is not available.c                       e Zd Zi Zh ded<    e       ed<   ddhed<   ddhed<   i Zh ded	<   h ded
<   h ded<   h ded<    e       ed<   erdhed<   erdhed<   yy)DistTestCases>   mpiuccr$   allgather_coalescedr   r$   rT   zsendrecv anysourcezcpu barrier>   rT   gloor$   gpur'   ddpsubgrouppluginr&   hpur%   r(   N)r-   r.   r/   skip_collectivesetbackend_featurer   r   r3   r4   r5   rR   rR   _   s     O-CO)* #OH-3UOO()&,e_OM" O4OE5OF4OE"9OJ #OH"("( r4   rR   c                     | t         v S N)DDP_RANK_DEVICES)devices    r5   requires_ddp_rankrc   u   s    %%%r4   c                 .     t                fd       }|S )zSkips if the world size exceeds the number of GPUs, ensuring that if the
    test is run, each rank has its own GPU via ``torch.cuda.device(rank)``.c                     t         s2t        s,t        s&t        j                  t
        d   j                         t        t        j                  d         }t         rJt        j                  j                         |k  r)t        j                  t
        d|    j                         t        rJt        j                  j                         |k  r)t        j                  t
        d|    j                         t        rJt        j                  j                         |k  r)t        j                  t
        d|    j                          | i |S )Nr<   
WORLD_SIZE
multi-gpu-)r   r   r   sysexit
TEST_SKIPSr+   r0   osenvirontorchr'   device_countr[   r(   )argskwargs
world_sizefuncs      r5   wrapperzskip_if_no_gpu.<locals>.wrapper}   s    XHHZ	*445L12
002Z?HHZ*ZL 9:DDE		..0:=HHZ*ZL 9:DDE		..0:=HHZ*ZL 9:DDET$V$$r4   r	   rr   rs   s   ` r5   skip_if_no_gpurv   y   s"     4[% % Nr4   c                 .     t                fd       }|S )Nc                      t         j                  d   dk7  rEt        t         j                  d         dk  r&t        j                  t
        d   j                          | i |S )NBACKENDrS   rf      r8   rk   rl   r0   rh   ri   rj   r+   ro   rp   rr   s     r5   rs   z(skip_if_small_worldsize.<locals>.wrapper   sR    JJy!U*BJJ|4L0MPQ0QHHZ 12<<=T$V$$r4   rt   ru   s   ` r5   skip_if_small_worldsizer}           
4[% % Nr4   c                 .     t                fd       }|S )Nc                      t         j                  d   dk7  rHt        t         j                  d         dz  dk(  r&t        j                  t
        d   j                          | i |S )Nry   rS   rf         r:   r{   r|   s     r5   rs   z&skip_if_odd_worldsize.<locals>.wrapper   sW    JJy!U*BJJ|4L0MPQ0QUV0VHHZ0::;T$V$$r4   rt   ru   s   ` r5   skip_if_odd_worldsizer      r~   r4   c                       fd}|S )Nc                 4     t                fd       }|S )Nc                      dk(  rKt         j                  j                         k  r*t        j                  t
        d    j                         y  | i |S Nr$   rg   )rm   r'   rn   rh   ri   rj   r+   )ro   rp   backendrr   ns     r5   rs   zCrequire_n_gpus_for_nccl_backend.<locals>.decorator.<locals>.wrapper   sM    & UZZ%<%<%>%Bj$45??@T,V,,r4   rt   )rr   rs   r   r   s   ` r5   	decoratorz2require_n_gpus_for_nccl_backend.<locals>.decorator   s     	t	- 
	- r4   r3   )r   r   r   s   `` r5   require_n_gpus_for_nccl_backendr      s     r4   c                      d } | S )Nc                 .     t                fd       }|S )Nc                      	 ddl m}m}  | i |S # t        $ r) t	        j
                  t        d   j                         Y y w xY w)Nr   )AutoModelForMaskedLM
BertConfigrM   )transformersr   r   ImportErrorrh   ri   rj   r+   )ro   rp   r   r   rr   s       r5   rs   z?import_transformers_or_skip.<locals>.decorator.<locals>.wrapper   sA    >IT,V,, >M2<<=>s    /AArt   ru   s   ` r5   r   z.import_transformers_or_skip.<locals>.decorator   s     	t	> 
	> r4   r3   )r   s    r5   import_transformers_or_skipr      s    
 r4   c                     t         r"t        j                  j                         | k\  ryt        r"t        j
                  j                         | k\  ryt        r"t        j                  j                         | k\  ryy)NTF)r   rm   r'   rn   r   r[   r   r(   )xs    r5   at_least_x_gpur      sS    UZZ,,.!3EII**,1EII**,1r4   c                       fd}|S )Nc                 2     t                fd       }|S )Nc                     t         j                  j                         r)t         j                  j                         k\  r | i |S t        r)t         j
                  j                         k\  r | i |S t        r)t         j                  j                         k\  r | i |S t        j                  t        d    j                         y )Nrg   )rm   r'   is_availablern   r   r[   r   r(   rh   ri   rj   r+   )ro   rp   rr   r   s     r5   rs   z4skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapper   s    zz&&(UZZ-D-D-F!-KT,V,,EII2249T,V,,EII2249T,V,,HHZ*QC 01;;<r4   rt   )rr   rs   r   s   ` r5   r   z#skip_if_lt_x_gpu.<locals>.decorator   s     	t	= 
	= r4   r3   )r   r   s   ` r5   skip_if_lt_x_gpur      s     r4   c                       fd}|S )Nc                 4     t                fd       }|S )Nc                      dk7  r | i |S t         j                  j                         r)t         j                  j                         k\  r | i |S t	        j
                  t        d    j                         y r   )rm   r'   r   rn   rh   ri   rj   r+   )ro   rp   r   rr   r   s     r5   rs   z9nccl_skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapper   sm    & T,V,,zz&&(UZZ-D-D-F!-KT,V,,HHZ*QC 01;;<r4   rt   )rr   rs   r   r   s   ` r5   r   z(nccl_skip_if_lt_x_gpu.<locals>.decorator   s     	t	= 
	= r4   r3   )r   r   r   s   `` r5   nccl_skip_if_lt_x_gpur      s    	 r4   c                     | j                         }d|v sJ d|v sJ d|v sJ |d   }|j                  d      dk(  r|n|j                  d      d   }||v sJ d| d|        y )	N	iteration	has_errorerrorz
Exception raised from r   zDid not find expected z in ddp logging data error: )_get_ddp_logging_datafindsplit)	model_DDP
err_substrddp_logging_datalogging_erractuals        r5   verify_ddp_error_loggedr      s     668********&&&&"7+K ??56"< 	89!<  	+R	x'CK=QRr4   c                 .     t                fd       }|S )aJ  
    Convenience decorator to set/unset TORCH_NCCL_BLOCKING_WAIT flag. Note that use of
    this decorator will override the setting of TORCH_NCCL_ASYNC_ERROR_HANDLING for
    the particular test. After the test, both TORCH_NCCL_BLOCKING_WAIT and
    TORCH_NCCL_ASYNC_ERROR_HANDLING will be restored to their original values.
    c                     	 t         j                  d   }t         j                  d= 	 t         j                  d   }dt         j                  d<   	  | i |}|||t         j                  d<   ||t         j                  d<   S S # t        $ r d }Y jw xY w# t        $ r d }Y gw xY w# dt         j                  d<   w xY w# ||t         j                  d<   ||t         j                  d<   w w xY w)NTORCH_NCCL_ASYNC_ERROR_HANDLINGTORCH_NCCL_BLOCKING_WAIT1)rk   rl   KeyError)ro   rp    cached_nccl_async_error_handlingcached_nccl_blocking_waitretrr   s        r5   rs   z(with_nccl_blocking_wait.<locals>.wrapper  s   	4AC1B, 

<=	9:<***;% 69BJJ12	S''C 0; 5 

5 )49R

56 51  	4/3,	4  	-(,%	- 69BJJ12 0; 5 

5 )49R

56 5s@   $B B 	B> BBB# B& "B##B& &B;>-C+rt   ru   s   ` r5   with_nccl_blocking_waitr     s%     4[ S  SD Nr4   c                       fd}|S )zK
    Runs a test for each distributed debug level specified in levels.
    c                 2     t                fd       }|S )Nc                     t         j                  j                  dd       }D ][  }|t         j                  d<   t        j                           | i |}t        j
                          |I|t         j                  d<   ] S )NTORCH_DISTRIBUTED_DEBUG)rk   rl   getc10dset_debug_level_from_envbarrier)ro   rp   	old_levellevelr   rr   levelss        r5   rs   z:with_dist_debug_levels.<locals>.decorator.<locals>.wrapper9  sx    

'@$GI F8=

45--/D+F+(<EBJJ89F Jr4   rt   )rr   rs   r   s   ` r5   r   z)with_dist_debug_levels.<locals>.decorator8  s     	t	 
	 r4   r3   )r   r   s   ` r5   with_dist_debug_levelsr   3  s    
$ r4   c                  @    t        t        j                          d      S )Nz+c10d was not compiled with the Gloo backend)r   r   is_gloo_availabler3   r4   r5   requires_gloor   M  !    )""$$5 r4   c           	         t        j                         st        d      S t        t        j
                  j                  j                         | k  d|  dt        j
                  j                  j                          d|       S )N+c10d was not compiled with the NCCL backendz0Requires NCCL version greater than or equal to: z	, found: z
, reason: )r   is_nccl_availabler   r   rm   r'   r$   version)r   msgs     r5   requires_nccl_versionr   T  sv    !!#*9
 	
 .JJOO##%/>wiyQVQ[Q[Q`Q`QhQhQjPkkuvyuz{
 	
r4   c                  @    t        t        j                          d      S )Nr   )r   r   r   r3   r4   r5   requires_ncclr   `  r   r4   c                  @    t        t        j                          d      S )Nz*c10d was not compiled with the UCC backend)r   r   is_ucc_availabler3   r4   r5   requires_uccr   g  !    )!!##4 r4   c                  @    t        t        j                          d      S )Nz*c10d was not compiled with the MPI backend)r   r   is_mpi_availabler3   r4   r5   requires_mpir   n  r   r4   c                 V    | t         } t        d | D              }t        | d|        S )a  
    Decorator to skip tests if no accelerator communication backend (NCCL, XCCL, HCCL) is available.

    Args:
        backends (Optional[List[str]]): Specific accelerator backends to check (e.g., ["nccl", "xccl", "hccl"]).
                                       If None, checks all supported accelerator backends (NCCL, XCCL, HCCL).

    Returns:
        callable: A decorator that skips the test if no specified accelerator backend is available.
    c              3      K   | ]=  }	 t        j                  t         j                  d  dj                  |d               ? yw)c                      t         S r`   )r   r3   r4   r5   <lambda>z=requires_accelerator_dist_backend.<locals>.<genexpr>.<lambda>  s    H r4   r#   c                       yNFr3   r3   r4   r5   r   z=requires_accelerator_dist_backend.<locals>.<genexpr>.<lambda>  s    r4   N)r   r   is_xccl_availabler   ).0r   s     r5   	<genexpr>z4requires_accelerator_dist_backend.<locals>.<genexpr>  sG       	&****$	
 #g}
%		(s   AAz5No accelerator communication backend available among )ACCELERATOR_DIST_BACKENDSanyr   )backendsbackend_availables     r5   !requires_accelerator_dist_backendr   u  sH     ,     *
?zJ r4   c                      t         j                  j                         xr$ t        j                  t
        j                  d      } t        |  d      S )Nr   z"multicast support is not available)rm   r'   r   r   has_multicast_supportr   CUDAr   )r   s    r5   requires_multicast_supportr     sI    

! 	G22:??AF  *!!, r4   c                 <     d _         t                fd       }|S )zSkips a test for ROCmTc                  n    t         s | i |S t        j                  t        d   j                         y )NrG   )r   rh   ri   rj   r+   r|   s     r5   rs   z*skip_if_rocm_multiprocess.<locals>.wrapper  s-    (((L)334r4   )skip_if_rocm_multiprocessr	   ru   s   ` r5   r   r     s(    %)D"
4[5 5
 Nr4   c                  <    t        t        j                  dk(  d      S )Nwin32z8This unit test case is not supported on Windows platform)r   rh   platformr3   r4   r5   skip_if_win32r     s    )B r4   rb   majorminorreturnc                     | j                   dk7  rt        d      t        j                  j                  yt        j
                  j                  |       ||fk\  S )z
    Returns True if the device's compute capability is (major, minor) or higher.
    Error out if the device is not a CUDA device.
    Returns False if device is a RoCM device.
    r'   z3sm_is_or_later() is only supported for CUDA devicesF)type
ValueErrorrm   r   hipr'   get_device_capability)rb   r   r   s      r5   sm_is_or_higher_thanr     sM     {{fNOO}}$::++F3u~EEr4   	localhostr   T   )minutesFc                     t               }|rEt        |t        d      z        }t        j                  j
                  j                  | ||||      S t        j                  | |||||      S )zL
    Creates a TCP store. Retries if the chosen port is already in use.
    r   )milliseconds)wait_for_workers	use_libuv)r   r0   r   rm   classes	dist_c10dTCPStorer   )	addrrq   	is_mastertimeoutr   	jit_classr   porttimeout_milliseconds	            r5   create_tcp_storer	    sr     D!'I1,E"EF}}&&//$
I/B
 	
 }}-
 	
r4   i  !DISTRIBUTED_TESTS_DEFAULT_TIMEOUT300test_ddp_uneven_inputsi     test_join_kwargs	lazy_initc                     t         j                  dk(  s| !t        j                  j	                  d|      S t        j                  j	                  | |      S )Nr   z	127.0.0.1)hostnamer  	interfacer  )rh   r   r   ProcessGroupGloocreate_devicer  s     r5   r  r    s[    
||w)"3$$22 I 3 
 	
 $$229 3 
 	
r4   c                 Z    t         j                  | j                  d      d   t              S N.r   )TIMEOUT_OVERRIDEr   r   TIMEOUT_DEFAULT)test_ids    r5   get_timeoutr    s#    c 22 6HHr4   c               #   N  K   t               t               }} t        j                  t        j                  }}	 | |ct        _        t        _        t        j                  t        j                  f ||ct        _        t        _        y # ||ct        _        t        _        w xY wwr`   )r
   rh   stdoutstderr)new_outnew_errold_outold_errs       r5   captured_outputr$    sl     z8:WGzz3::WG2!('
CJjj#**$$!('
CJ'
CJs   5B%9B	 1B%	B""B%rankrq   
num_inputsc                    ddt         dt         dt         dt         fd}dt         fd}t        |d      t        |d	      t        |d
      t        |d      t        |d	      t        |d
      fD cg c]N  }t        |      D cg c]  } ||| z  |z   ||z         c}t        |      D cg c]  } ||||z         c}fP c}}S c c}w c c}w c c}}w )z
    Generate a number of basic test cases for sparse reduction.
    These cover tensors with a varying number of sparse dimensions and a varying
    number of dense dimensions. The only reduction operation we support is sum.
    r   r%  rq   sparse_dims
dense_dimsc           	         t        j                  t        j                  | dz         d| dz   f      }|gt        |      D cg c]  }d c}z   }t        |dz
        D ]A  }t        j                  |t        j
                  d| dz         f      }|j                  |       C t        j                  | dz   gt        |      D cg c]  }d c}z         }t        j                  |||      S c c}w c c}w )Nr   r   )	rm   reshapearangerangecatzerosappendonessparse_coo_tensor)r%  rq   r(  r)  indices_shapevaluess           r5   generatez,simple_sparse_reduce_tests.<locals>.generate  s     --TAX 6D1HF5+<=a=={Q' 	%Aii%++a*B CDGLL$	% TAXJU:5F)G!)GGH&&w>>  > *Hs   	C+	C0
c           
      |    t        t        j                  t        |      D cg c]  } | ||       c}      S c c}w r`   )r   operatoraddr-  )fnrq   r%  s      r5   compute_sumz/simple_sparse_reduce_tests.<locals>.compute_sum  s2    LLE*<MND2dJ/N
 	
Ns   9
)r(  r      )r)  )r   r   )r0   r   r-  )r%  rq   r&  r7  r<  r;  is          r5   simple_sparse_reduce_testsr?    s    
?s 
? 
?# 
?s 
?
C 
 H!,H!,H!,H+H+H+
 	 z* :$q(*z*AB @EZ?PQ![Z*45Q	
  Rs$   5CC C/CC
Cr   c           
      f   t         j                  j                         }t        rt         j                  j                         }t
        rt         j                  j                         }t        |      }d}| |kD  r|| z  }t        |       D ci c]  }|t        |||z  |dz   |z          }}|S c c}w )zMultigpu tests are designed to simulate the multi nodes with multi
    GPUs on each node. Nccl backend requires equal #GPUs in each process.
    On a single node, all visible GPUs are evenly
    divided to subsets, each process only uses a subset.
    r   )	rm   r'   rn   r   r[   r   r(   r-  list)rq   r   nGPUsvisible_devicesnGPUs_per_processr>  rank_to_GPUs          r5   init_multigpu_helperrF  2  s     JJ##%E		&&(		&&(ElO E!Z/ z" 	
4$5 5QBS8STUUK  	s   B.tmp_dirinit_methodc                    t        j                         at        j                  t        j
                  d<   t	        j                  t        j                  j                  t        j                  d             t	        j                  t        j                  j                  t        j                  d             t        j                  j                  t        j                  d      }t	        j                  |       | | t        j
                  d<   y t        t        j                  j                  |d      z   t        j
                  d<   y )NTEMP_DIRr   test_dirinit_dirINIT_METHODshared_init_file)
tempfileTemporaryDirectoryrG  namerk   rl   mkdirpathjoinr   )rH  init_dir_paths     r5   initialize_temp_directoriesrV  N  s    ))+G$\\BJJzHHRWW\\',,	23HHRWW\\',,
34GGLLz:MHH]$/

=!$/"'',,-3
 %


=!r4   c                  :    t         t         j                          y y r`   )rG  cleanupr3   r4   r5   cleanup_temp_dirrY  _  s     r4      c            	       6    e Zd ZdZdZdefdZedefd       Zede	fd       Z
d Z	 dded	edd
f fdZd fdZd fdZdefdZddZddZ G d de      Zede	fd       Zede	dededd
fd       Zdedd
fdZddZddZddZedefd       Z xZS )MultiProcessTestCaser   
   r   c                      yr   r3   selfs    r5   _should_stop_test_suitez,MultiProcessTestCase._should_stop_test_suite|  s    r4   c                      y)NTr3   r_  s    r5   destroy_pg_upon_exitz)MultiProcessTestCase.destroy_pg_upon_exit  s    r4   c                     t         S r`   DEFAULT_WORLD_SIZEr_  s    r5   rq   zMultiProcessTestCase.world_size      !!r4   c                 V    t              fd       }t        j                  ||       S )Nc                 j    | j                   | j                  k(  r| j                         y          y r`   )r%  MAIN_PROCESS_RANK_join_processesr`  r;  s    r5   rs   z1MultiProcessTestCase.join_or_run.<locals>.wrapper  s(    yyD222$$R(r4   r	   types
MethodTyper`  r;  rs   s    ` r5   join_or_runz MultiProcessTestCase.join_or_run  .    	r	 
	 ..r4   method_name
methodNameNc                     |dk7  r|}t         |   |       	 t        | |      }t        | || j	                  |             y # t
        $ r+}|dk7  rt        d| j                   d|       |Y d }~y d }~ww xY wNrunTestzno such test method in : super__init__getattrsetattrrq  AttributeErrorr   	__class__r`  rs  rt  r;  er  s        r5   r{  zMultiProcessTestCase.__init__      
 "$K%		{+BD+t'7'7';< 	Y& !-dnn-=R
|L '	   (A 	A6!A11A6c                     t         |           i | _        g | _        g | _        | j
                  | _        t        j                  d      j                  | _
        i | _        y )NFdelete)rz  setUpspecial_return_code_checksskip_return_code_checks	processesrj  r%  rO  NamedTemporaryFilerQ  	file_namepid_to_piper`  r  s    r5   r  zMultiProcessTestCase.setUp  sT     13' .0$**	!44EBGGr4   c                 r    t         |           | j                  D ]  }|j                           g | _        y r`   )rz  tearDownr  	terminate)r`  pr  s     r5   r  zMultiProcessTestCase.tearDown  s3     	AKKM	 r4   c                 F    | j                         j                  d      d   S r  idr   r_  s    r5   _current_test_namez'MultiProcessTestCase._current_test_name  s    wwys#B''r4   c                    g | _         t        t        | j                              D ]  }t        j
                  j                         \  }} || j                  j                  dt        |      z   || j                         | j                  |fdt        | dd      i      }|j                          t        j                  d||j                          || j"                  |j                   <   | j                   j%                  |        y )Nprocess fake_pgF)targetrQ  ro   rp   Started process %s with pid %s)r  r-  r0   rq   rm   multiprocessingPiper  _runr2   r  r  r|  startloggerinfopidr  r0  )r`  procr%  parent_conn
child_connprocesss         r5   _start_processesz%MultiProcessTestCase._start_processes  s    #doo./ 	+D&+&;&;&@&@&B#K~~**#d)+D335t~~zRwtY>	G MMOKK8$L,7DW[[)NN!!'*	+r4   c                     	 t         j                  j                  d       t         j                  j	                  d      j
                  }| j                  |       y # t        $ r Y Fw xY w)Nspawn)rm   r  set_start_methodRuntimeErrorget_contextProcessr  )r`  r  s     r5   _spawn_processesz%MultiProcessTestCase._spawn_processes  s[    	!!227; $$009AAd#	  		s   A 	A('A(c                       e Zd ZdZy)MultiProcessTestCase.Eventr   N)r-   r.   r/   GET_TRACEBACKr3   r4   r5   Eventr    s    r4   r  r%  c                    t         j                  d|       	 t        j                  j	                  | |g      }| |v r| j
                  rt         j                  d|       y | j                         }t         j                  d||       |t        j                  j                  k(  rt        j                  d      5 }t        j                  |       |j                          |j!                  d       | j#                  |j%                                t         j                  d|       d d d        ||v ry # 1 sw Y   xY w)Nz*Starting event listener thread for rank %sz:Pipe closed for process %s, stopping event listener threadzReceived event %s on process %szr+)moder   zProcess %s sent traceback)r  debugr  
connectionwaitclosedrecvr  r\  r  r  rO  r  faulthandlerdump_tracebackflushseeksendread)parent_pipesignal_piper%  ready_pipeseventtmp_files         r5   _event_listenerz$MultiProcessTestCase._event_listener  s   A4H)4499;:TUKk)%%LLT #((*=udK066DDD!44$? G8$33H= ( a(#((9$?FG k)5  G Gs   :A,D55D>	test_namer  c                 T     | |      }||_         ||_        |j                  ||       y r`   )r%  r  run_testclsr%  r  r  r  rp   r`  s          r5   r  zMultiProcessTestCase._run  s)     9~	"i-r4   c           	          t         j                  j                  d      \  }}t        j                  t
        j                  ||| j                  fd      }|j                          t        j                  dk7  r2t        j                  dk7  rt         j                  j                  d       dt        j                  d<   	  t        | |              ||j;                  d        |J |j=                          |j?                          | j@                  r	 tC        jD                          y y # t         j"                  $ r[}t$        j'                  d	| j                  |t)        |             t        j*                  t,        d
   j.                         Y d }~d }~wt0        $ r t$        j3                  dt5        j6                         | j                  t
        j8                         |j;                  t5        j6                                t        j*                  t
        j8                         Y Zw xY w# ||j;                  d        |J |j=                          |j?                          w xY w# tF        tH        f$ r Y y w xY w)NF)duplexT)r  ro   daemonr   darwinr   TORCH_SHOW_CPP_STACKTRACESz4Process %s skipping test %s for following reason: %srK   z;Caught exception: 
%s exiting process %s with exit code: %s)%rm   r  r  	threadingThreadr\  r  r%  r  rh   r   _C'_set_print_stack_traces_on_fatal_signalrk   rl   r|  unittestSkipTestr  r  r2   ri   rj   r+   	Exceptionr   	traceback
format_excTEST_ERROR_EXIT_CODEr  rT  closerc  r   destroy_process_groupAssertionErrorr   )r`  r  r  signal_recv_pipesignal_send_pipeevent_listener_threadses          r5   r  zMultiProcessTestCase.run_test  s   -2-B-B-G-Gu-G-U** ) 0 0'77/;!

 	##%<<7"s||x'? HH<<TB36

/0	 $GD)$&(  + %%d+(444!&&($$ **,	 %7    	6KKF		B	 HHZ	*4455 		@LLQ$$&		$99	 Y1134HH)>>?		@  + %%d+(444!&&( #J/ sJ    D+ I+ +H,>AFH/ BH,(H/ +H,,H/ /9I(+I=<I=c                    g }t        | j                        D ]h  \  }}|j                  | j                  |j                     }	 |j                  t        j                  j                         |j                  ||f       j |D ]x  \  }}	 |j                  d      rK|j                  rt        j                  d|       ;|j!                         }t        j                  d||       nt        j                  d|       z y # t        $ r"}t        j                  d||       Y d }~d }~ww xY w# t        $ r!}t        j                  d||       Y d }~d }~ww xY w)NzBEncountered error while trying to get traceback for process %s: %sr   z5Pipe closed for process %s, cannot retrieve tracebackz)Process %s timed out with traceback: 

%sz6Could not retrieve traceback for timed out process: %s)	enumerater  exitcoder  r  r  r\  r  r  r0  ConnectionErrorr  r   pollr  r  r  )r`  pipesr>  r  piper  r%  r  s           r5   _get_timedout_process_tracebackz4MultiProcessTestCase._get_timedout_process_tracebackH  s>   #DNN3 	JAw'''4II288FFGLL!T+	   	JD$99Q<{{S  ! $		ILLEtY LLPRV!	 ' LL\ 6 # X s6   <D3D/ >D/	D,
D''D,/	E8EEc                    t        | j                               }t        j                         }d}	 	 t        | j                        D ]w  \  }}|j
                  t        j                  k(  s$t        d| d|j
                   d       t        j                  j                         }|D ]  }|j                           d} n |rnt        d | j                  D              rntt        j                         |z
  }	|	|kD  rA| j                          t        d| d       | j                  D ]  }|j                           nt        j                  d	       #t        j                         |z
  }
| j!                  ||
       | j"                  j%                         D ]  }|j'                           y # | j"                  j%                         D ]  }|j'                           w xY w)
NFTProcess z terminated with exit code z", terminating remaining processes.c              3   8   K   | ]  }|j                   d u  y wr`   )r  )r   r  s     r5   r   z7MultiProcessTestCase._join_processes.<locals>.<genexpr>  s     F!qzz-Fs   zTiming out after z" seconds and killing subprocesses.g?)r  r  timer  r  r  r\  r  printrm   r  active_childrenr  allr  sleep_check_return_codesr  r6  r  )r`  r;  r  
start_timesubprocess_errorr>  r  r  acelapsedelapsed_timer  s               r5   rk  z$MultiProcessTestCase._join_processesr  s   dggi(YY[
 &	%dnn5 DAq zz%9%N%NN&qc)DQZZLPrs +0*?*?*O*O*Q"1 +BLLN++/( $Ft~~FF))+
2W$88:+G94VW "^^ &&

3= @  99;3L$$R6 ((//1 

((//1 

s   9F. +DF. .1Gc           
         | j                   st        j                  d       y| j                   d   }t        | j                         D cg c]&  \  }}|j                  t
        j                  k(  r||f( }}}|r[d}|D ]I  \  }}| j                  |j                     j                         }	|d| dt
        j                   d|	 dz  }K t        |      t        | j                         D ]#  \  }}|j                  t        d| d	| d
       || j                  v ryt        j                         D ]q  }
|j                  |
j                  k(  st        r1t        j!                  d| j#                         |
j$                          yt'        j(                  |
j$                         d}|| j*                  v r| j*                  |   }| j-                  |j                  |d| d|j                   d|j                          yc c}}w )z
        Checks that the return codes of all spawned processes match, and skips
        tests if they returned a return code indicating a skipping condition.
        z<Note: no subprocesses were spawned, test was likely skipped.Nr    r  z exited with error code z and exception:

 terminated or timed out after  seconds6Skipping %s on sandcastle for the following reason: %szExpected exit code z	 but got z
 for pid: )r   )r  r  warningr  r  r\  r  r  r  r  r  r  rj   r6  r+   r   r  r  r,   r  r  r  assertEqual)r`  r;  r  first_processr>  r  errored_processesr   r  error_messageskipexpected_return_codes               r5   r  z(MultiProcessTestCase._check_return_codes  s#    ~~NNN q) "$..1
1zz1FFF F
 

 E/ 
7 $ 0 0 = B B Dqc!9:N:c:c9d e''4oR9 u%% dnn- 	DAqzz!"qc!@hW 	 ---%%' 	:D%%7 
 KKP	
 "++DLL99	:"  ! 000#'#B#B2#F "" %&:%;9]E[E[D\\fgtgxgxfyz 	 	
g
s   
+Hc                      | j                   dk(  S )Nr   r%  r_  s    r5   r  zMultiProcessTestCase.is_master  s    yyA~r4   rw  rw  r   N)r-   r.   r/   rj  r  boolra  propertyrc  r0   rq   rq  r2   r{  r  r  r  r  r  r   r  staticmethodr  classmethodr  r  r  rk  r  r  __classcell__r  s   @r5   r\  r\  s  s8   
   d   "C " "/ ?H8;	&"(C (+"$    < ..#&.36.	. .5# 5t 5n(T*XJ
X 4  r4   r\  c                   >     e Zd Z fdZd ZdefdZddZd Z xZ	S )DistributedTestBasec                     t         |           t        | j                        t        j
                  d<   | j                          y )Nrf   )rz  r  r2   rq   rk   rl   r  r  s    r5   r  zDistributedTestBase.setUp  s/    #&t#7

< r4   c                     	 t         j                  j                          	 t	        j
                  | j                         y # t        $ r Y ,w xY w# t        $ r Y y w xY wr`   )rm   distributedr  r  rk   remover  OSErrorr_  s    r5   r  zDistributedTestBase.tearDown  sU    	335	IIdnn%  		  		s"   A A 	AA	AAr   c                 "    d|v ryd|v ryd|v ryy)Nr'   r$   r[   r&   r(   r%   rV   r3   )r`  rb   s     r5   r   zDistributedTestBase.backend  s$    Vf_f_r4   c                    || j                   }t        j                  |      j                         }t        j                  j                  | j                  |      }t        j                  j                  | j                  |      || j                  |       d| j                  |      v sd| j                  |      v r)t        j                  j                  | j                         t        j                  j                  j                         S )Nr   rq   r%  storer$   r%   )rq   rm   get_device_modulern   r  	FileStorer  init_process_groupr   r%  acceleratorset_device_indexdistributed_c10d_get_default_group)r`  rb   rq   num_visible_devicesr  s        r5   	create_pgzDistributedTestBase.create_pg  s    J#55f=JJL!!++DNN<OP,,LL(!	 	- 	
 T\\&))Vt||F7K-K..tyy9  11DDFFr4   c                     t        j                  |      j                         }t        | j                        D ci c]	  }|||z  g c}S c c}w r`   )rm   r  rn   r-  rq   )r`  rb   r   r>  s       r5   rank_to_devicez"DistributedTestBase.rank_to_device  sG    #55f=JJL6;DOO6LMA++,,MMMs   Ar`   )
r-   r.   r/   r  r  r2   r   r!  r#  r  r  s   @r5   r  r    s%     
 GNr4   r  subtest_configtest_fntest_kwargsc                    t        |j                               }|D cg c]  }|d   	 }}|D cg c]  }|d   	 }}t        j                  | D ]  }	t	        t        ||	            }
 | j                  di |
5  t        j                  j                           ||i ||
 t        j                  j                          ddd       t        j                           yc c}w c c}w # 1 sw Y   *xY w)a\  
    Runs a test function given by ``test_fn`` as a subtest according to the
    configurations specified by ``subtest_config``. This amortizes the
    costly setup overhead (including process spawn and initializing the
    process group) over the subtests.

    Args:
        subtest_config (Dict[str, List[Any]]): A mapping from subtest
            keyword argument name to a list of its possible values.
        test_fn (Callable): A callable that runs the actual test.
        test_args: Positional arguments to pass to ``test_fn``.
        test_kwargs: Keyword arguments to pass to ``test_fn``.
    r   r   Nr3   )rA  items	itertoolsproductdictzipsubTestrm   _dynamoresetr   r   )cls_instr$  r%  	test_argsr&  subtest_config_itemsitemsubtest_config_keyssubtest_config_valuesr6  subtest_kwargss              r5   run_subtestsr7  "  s    * 9=^=Q=Q=S8T:N%O$d1g%O%OBV-W$d1g-W-W##%:; c"5v>?X// 	"MM!Y@+@@MM!	" 	 &P-W	" 	"s   C C%:AC**C3	c                      t         t         S 	 t        j                  g dd      j                  dk(  a t         S # t        $ r
 da Y t         S w xY w)a   
    If shell command `fi_info -p efa -t FI_EP_RDM` returns exit code 0 then we assume that the machine has
    Libfabric EFA interfaces and EFA software components installed,
    see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start.html.
    )fi_infoz-pefaz-t	FI_EP_RDMF)checkr   )EFA_PROBE_RESULT
subprocessrun
returncodeFileNotFoundErrorr3   r4   r5   has_efarB  H  s]     #!NN;5j 	   ! !s   &: AAc                  "    t               rddgS dS )a  
    If the machine has Libfabric EFA interfaces and EFA software components installed it may cause
    'RuntimeError: In operator() at tensorpipe/common/ibv.h:172 "": Operation not supported' if tensorpipe
    uses InfiniBand transport, so we exclude it from tensorpipe transports,
    see https://github.com/pytorch/pytorch/issues/73885 and https://github.com/pytorch/pytorch/issues/65022
    shmuvN)rB  r3   r4   r5   tp_transportsrF  ^  s     $IE4=/4/r4   c                 d      t        t        |      S d t                fd       }|S )z+
    Wrapper to use with a test method
    )r  rq   c                      t               t        j                         }fd fd}g }t               D ]=  }t	        j
                  |||f      }|j                          |j                  |       ? |S )Nc                  >     t         j                  j                  k(  S r`   r   r  _worldworlds   r5   world_is_validzaspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.world_is_validw      D118888r4   c                 ~   t        j                  d| |       	                  rt        j                          y y # t        $ rR}t        j                  j                  | t        j                         f       t        j                  |       Y d }~sd }~ww xY w#         rt        j                          w w xY w)Nthreadedr   r%  rq   r  )r   r  BaseExceptionMultiThreadedTestCaseexception_queueputrh   exc_infor"   exception_handler  )r%  world_pgr  excallbackrN  rq   s       r5   workerzYspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.workerz  s    ##"*E
1
 "#..0 $ ! %55994:PQ!22  "#..0 $s*   A   	B	ABB BB B<r  ro   )r    r   	HashStorer-  r  r  r  r0  )	rq   r[  global_storer\  threadsr%  trM  rN  s	   ``     @@r5   #_run_test_method_with_multi_threadszIspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threadss  sq    $&~~'	9	1  *% 	D  dE<5PQAGGINN1	
 r4   c                 X    t         j                  j                  j                  d       	   fd      }t        j                  |       t         j                  j                  j                  d       y # t         j                  j                  j                  d       w xY w)NTc                       g i S r`   r3   )ro   rr   rp   r`  s   r5   r   z?spawn_threads_and_init_comms.<locals>.wrapper.<locals>.<lambda>  s    D$?$?$? r4   F)rm   r  _distributed_c10d_set_thread_isolation_moderT  _join_threads)r`  ro   rp   r`  rb  rr   rq   s   ``` r5   rs   z-spawn_threads_and_init_comms.<locals>.wrapper  sv     	""==dC	I9?G "//>HH&&AA%HEHH&&AA%Hs   %A> >+B))r   spawn_threads_and_init_commsr	   )rr   r  rq   rs   rb  s   ` ` @r5   rh  rh  h  sD     |('j
 	
> 4[
I 
I Nr4   c                       e Zd ZdZ ej
                         ZdZd Z	 dde	de	ddf fdZ
d	 Zd
 Zd fdZ fdZd Zed        Zd Zed        Zed        Zedefd       Zede	fd       ZddddZddddZ xZS )rT  a5  
    Test runner that runs all tests with the in-proc process group using
    multiple threads with the threaded process group.

    Each test spawns world_size threads and run the test method in each thread.

    Difference from regular MultiProcess test runner:
    Must explicitly defines SetUp and call self._spawn_threads() to run the tests.
    Cannot use setUp / tearDown (must use perThreadSetup / perThreadShutdown)
        to set up / tear down each thread when running each test.
    No global state possible
        How bad of a limitation is this?
    r   c                 V    t              fd       }t        j                  ||       S )Nc                     | j                   | j                  k(  r| j                  | j                         y          y r`   )r%  MAIN_THREAD_RANKrg  r`  rl  s    r5   rs   z2MultiThreadedTestCase.join_or_run.<locals>.wrapper  s.    yyD111""4<<4r4   rm  rp  s    ` r5   rq  z!MultiThreadedTestCase.join_or_run  rr  r4   rs  rt  r   Nc                     |dk7  r|}t         |   |       	 t        | |      }t        | || j	                  |             y # t
        $ r+}|dk7  rt        d| j                   d|       |Y d }~y d }~ww xY wrv  ry  r  s        r5   r{  zMultiThreadedTestCase.__init__  r  r  c                      y r`   r3   r_  s    r5   perThreadSetUpz$MultiThreadedTestCase.perThreadSetUp  s    r4   c                      y r`   r3   r_  s    r5   perThreadTearDownz'MultiThreadedTestCase.perThreadTearDown  s    r4   c                 x    t         |           | j                  | _        g | _        dt
        j                  d<   y)z
        setUp only set up things in the main thread, if you want to configure things
        in the spawned threads, use perThreadSetUp
        r   r  N)rz  r  rl  r%  r`  rk   rl   r  s    r5   r  zMultiThreadedTestCase.setUp  s1    
 	))	36

/0r4   c                 0    t         |           g | _        y)z
        tearDown only set up things in the main thread, if you want to configure things
        in the spawned threads, use perThreadTearDown
        N)rz  r  r`  r  s    r5   r  zMultiThreadedTestCase.tearDown  s    
 	r4   c                    t         j                  j                  j                  d       | j                  }t               t        j                         | j                  _	        fd} |       st        d      t        | j                        D ]e  }t        j                  | j                  j                  ||| j                  f      }|j!                          | j"                  j%                  |       g y)zk
        class method to spawn threads and run test, use this method in the SetUp of your TestCase
        Tc                  >     t         j                  j                  k(  S r`   rJ  rL  s   r5   rN  z<MultiThreadedTestCase._spawn_threads.<locals>.world_is_valid  rO  r4   zInvalid worldr]  N)rm   r  re  rf  r  r    r   r^  r  r_  r  r-  rq   r  r  r  r  r`  r0  )r`  r  rN  r%  ra  rM  s        @r5   _spawn_threadsz$MultiThreadedTestCase._spawn_threads  s     	""==dC++	$&&*nn&6#	9 //$//* 	#D  ~~**)T4??1SA GGILL"	#r4   c                     | |      }||_         t        |d      rWt        j                         |_        t
        j                  |j                  _        t
        j                  |j                  _	        |j                  |||       y )N_tls)r%  hasattrr  localrx  r   
_precision	precision_rel_tolrel_tolrun_test_with_threaded_pg)r  r  r%  rq   rp   r`  s         r5   r  zMultiThreadedTestCase._run  sb    9~	 4 !)DI"*"5"5DII ( 1 1DII&&y$
Cr4   c                    t        j                  d||| j                  j                         | j	                          	  t        | |              t        j                          | j                          y# t        $ rN}| j                  j                  |t        j                         f       t        j                  |       Y d}~wd}~ww xY w# t        j                          | j                          w xY w)zd
        Run the current test associated with `test_name` using the threaded process group.
        rQ  rR  N)r   r  r  r_  ro  r|  rS  rU  rV  rh   rW  r"   rX  r  rq  )r`  r  r%  rq   rZ  s        r5   r  z/MultiThreadedTestCase.run_test_with_threaded_pg  s     	!..--		
 			%$GD)$& &&(""$  	  $$dCLLN%;<.. 	 &&(""$s*   A5 5	C>ACC CC &C5c           
         t         }	 t        |      D ]f  \  }}|j                  t        d|             |j	                         s2t
        j                  j                  |t        t        d| d      d ff       h t        j                          g }| j                  j                         sF| j                  j                         }|j                  |       | j                  j                         sFt                t        j                   j"                  j%                  d       | j'                  |||       y # t                t        j                   j"                  j%                  d       w xY w)Nr   zRank failed to join in under r  F)r  r  rT  maxis_aliverT  rU  rV  TimeoutErrorr"   r/  emptyr   r0  r!   rm   r  re  rf  r  )r  r`  r;  r  idxthreadfailed_ranksfailures           r5   rg  z#MultiThreadedTestCase._join_threads*  s-   !	I(1 VC7O,??$)99== , ,&CG9H$U!" !%	 ##%L))//1--113##G, ))//1 #$HH&&AA%Hgr: #$HH&&AA%Hs   <D9 B,D9 95E.c           	         d}d}|D ]1  \  }}|d   }t        |t        j                        r;t        j	                  d||t        |             |dk  sMt        d   j                  }at        |t              r)d| d| d	}	t        j                  |	       t        |	      t        |t              rEdj                  t        j                  |       }	t        j                  d
|	|       |d| d|	 dz  }t        |t              st!        |j"                        t$        k(  s|dk  s&|j"                  }4 t'        |      dkD  rt        |      |dkD  rqt        j)                         D ]Y  }
||
j                  k(  st*        r#t        j	                  d||
j,                          y t        j                  |
j,                         y y )Nr  r   r   z3Thread %s skipping test %s for following reason: %sr   rK   zThread r  z	 seconds
z'Caught exception: 
%s exiting thread %sz exited with exception:
r  r  )
isinstancer  r  r  r  r2   rj   r+   r  r   r  r  rT  r  format_exception
SystemExitr   coder0   lenr6  r   r,   )r  r  r  r;  	error_msg	skip_coder%  rW  excr   r  s              r5   r  z)MultiThreadedTestCase._check_return_codesH  s    		* 	)ND(1+C#x001IH	 q= *9 5 ? ?IC.v%DWIZXS!"3''C+ggi88(CDGdSwtf,EcU"MM	C,>S(Y] #I+	)0 y>Ay))q="))+ >.$T LL
 &//==> r4   c                     t         S r`   re  r_  s    r5   rq   z MultiThreadedTestCase.world_sizez  rg  r4   c                 F    | j                         j                  d      d   S r  r  r_  s    r5   r  z(MultiThreadedTestCase._current_test_name~  s     wwys#B''r4   r   r  c                J    | j                   |k(  r| j                  |||       yy)z
        The reason why we have this util function instead of
        self.assertEqual is all threads are sharing one CPU RNG
        so the assertion result is only reliable on rank 0
        N)r%  r  r`  r   yr   r%  s        r5   assertEqualOnRankz'MultiThreadedTestCase.assertEqualOnRank  s'     99Q3' r4   c                H    | j                   |k(  r| j                  ||       y y r`   )r%  assertNotEqualr  s        r5   assertNotEqualOnRankz*MultiThreadedTestCase.assertNotEqualOnRank  s#    991% r4   r  r  r`   )r-   r.   r/   __doc__queueQueuerU  rl  rq  r2   r{  ro  rq  r  r  rv  r  r  r  rg  r  r	  r0   rq   r  r  r  r  r  s   @r5   rT  rT    s     "ekkmO/ ?H8;	&	7#. D D%. ; ;: /> />b "C " " (C ( (( (&1 & &r4   rT  c                        e Zd Zdeej
                  ej                  f   deddf fdZ	dej                  dej                  fdZ
 xZS )SaveForwardInputsModuleforward_inputscast_forward_inputsr   Nc                 t    t         |           t        j                  dd      | _        || _        || _        y )Nd   )rz  r{  nnLinearlr  r  r`  r  r  r  s      r5   r{  z SaveForwardInputsModule.__init__  s2    
 	3$,#6 r4   r   c                     || j                   | <   | j                  | j                  r3|j                  | j                  j                  j
                              S |      S r`   )r  r  r  toweightdtyper`  r   s     r5   forwardzSaveForwardInputsModule.forward  sI    $%D!vv43K3Kadd466==../SSQRSSr4   r-   r.   r/   r+  r  Modulerm   Tensorr  r{  r  r  r  s   @r5   r  r    sT    7RYY457 "7 
	7T T%,, Tr4   r  c                        e Zd Zdeej
                  ej                  f   deddf fdZ	dej                  dej                  fdZ
 xZS )SaveForwardInputsModelr  r  r   Nc                 t    t         |           t        ||      | _        t        ||      | _        || _        y r`   )rz  r{  r  c1c2r  r  s      r5   r{  zSaveForwardInputsModel.__init__  s6    
 	).:MN).:MN,r4   r   c                 `    || j                   | <   | j                  | j                  |            S r`   )r  r  r  r  s     r5   r  zSaveForwardInputsModel.forward  s)    $%D!wwtwwqz""r4   r  r  s   @r5   r  r    sQ    -RYY45- "- 
	-# #%,, #r4   r  c              #     K   |st         j                  j                  |        dt        j                  d<   dt        j                  d<   |rp|rVt         j
                  j                  j                  j                  j                         }t        j                  d|| |       nt        j                  || |       t         j                  j                          t         j                  j                  j                  j!                          	 d  t         j                  j                          t         j                  j                  j                  j!                          |rt        j"                          y y # t         j                  j                          t         j                  j                  j                  j!                          |rt        j"                          w w xY ww)Nr   MASTER_ADDR6789MASTER_PORTfaker  )r   r%  rq   )rm   r  r  rk   rl   testing	_internalr  r  	FakeStorer   r  r.  r/  utilscountersclearr  )r%  rq   r   init_pgr  r  s         r5   _dynamo_dist_per_rank_initr    sJ     **40 +BJJ} &BJJ}MM++77??IIKE##%	 ##G$:V	MM	MM  &&()$$**,&&(  	$$**,&&( s    D
G%E9 A(G%9A)G""G%c                   @     e Zd ZdZe fd       Ze fd       Z xZS )#DynamoDistributedSingleProcTestCasez
    Test harness for single-process dynamo distributed tests,
    initializes dist process group.

    Prefer this for simple tests, as it's easier to debug.
    c                 `   t         |           | j                  j                  t	        j
                  t        j                  ddd             d| _        d| j                   | _	        d| j                  v rd n| j                  g| _
        t        j                  d| j                  d	       y )
Nr   12355)r  r  r   zcuda:r'   r$   r   )r%  rq   )rz  
setUpClass_exit_stackenter_contextr   r+  rk   rl   r%  rb   
device_idsr   r  r  r  s    r5   r  z.DynamoDistributedSingleProcTestCase.setUpClass  s    %%JJ

#.#*	
 SXXJ'
!'3::!5CHH:SXX!Dr4   c                 J    t        j                          t        |           y r`   )r   r  rz  tearDownClassr  s    r5   r  z1DynamoDistributedSingleProcTestCase.tearDownClass  s    ""$r4   )r-   r.   r/   r  r  r  r  r  r  s   @r5   r  r    s2     E E"    r4   r  c            	       H    e Zd ZdZedefd       Zededededdfd       Z	y)	"DynamoDistributedMultiProcTestCasea   
    Use this for tests that actually run on multiple GPUs.

    Decorate tests with @skip_if_lt_x_gpu(ngpu)

    Note: MultiProcTestCase spawns processes per test and is slow.
    Prefer MultiThreadedTestCase for most tests. Perhaps use this one
    sparingly for integration tests.
    r   c                 >    t         j                  j                         S r`   )rm   r  rn   r_  s    r5   rq   z-DynamoDistributedMultiProcTestCase.world_size  s      --//r4   r%  r  r  Nc                     t        j                  t        j                                 | |      }||_        ||_        |j                  ||       y r`   )r   
addHandlerloggingNullHandlerr%  r  r  r  s          r5   r  z'DynamoDistributedMultiProcTestCase._run   sB     	W0023 9~	"i-r4   )
r-   r.   r/   r  r	  r0   rq   r  r2   r  r3   r4   r5   r  r    sV     0C 0 0 	.	.#&	.36	.		. 	.r4   r  c                   Z    e Zd ZU dZdZeed<   dZeed<   dZe	e
   ed<    ed      Zeed	<   d
Zeed<   ede	e
   fd       Zede
fd       Zedd       Zed        Zede
ddfd       Zed        Zedd       Ze fd       Ze fd       Zd fdZd Z	 dde
de
ddf fdZ xZS )MultiProcContinousTestr   rq   r%  N	rdvz_filex   )secondsr  Fpoison_pillr   c                      y)z
        ProcessGroup backend str.
        To be customized by sub test classes, e.g. "nccl".
        Otherwise we return None -- lazily decided by tensor.
        Nr3   )r  s    r5   backend_strz"MultiProcContinousTest.backend_str       r4   c                 \    t         j                  j                         }|y|j                  S )Ncpu)rm   r  current_acceleratorr   )r  curr_devices     r5   device_typez"MultiProcContinousTest.device_type%  s+    '';;=r4   c                      y)z
        ProcessGroup init options.
        To be customized by sub test classes, e.g. ProcessGroupNCCLOpTest
        Here we return None.
        Nr3   )r  high_priority_streams     r5   optszMultiProcContinousTest.opts,  r  r4   c                     |J t        j                  ||      }t        j                  | j                         |||| j	                         | j
                         t         j                  j                         | _        y )N)r   rq   r%  r  
pg_optionsr  )	r   r  r  r  r  r  r  r  pg)r  r%  rq   r  r  s        r5   _init_pgzMultiProcContinousTest._init_pg5  si    $$$y*5 	OO%!xxzKK	
 &&99;r4   r  c                     |j                  d      d   } | |      }| j                  |_        | j                  |_        t        ||      } |di | y )Nr  r   r3   )r   r%  rq   r|  )r  r  rp   r  r`  r%  s         r5   _run_test_given_idz)MultiProcContinousTest._run_test_given_idE  sM     MM#&r*	9~HH	..$	*&r4   c                    d|cxk  r|k  sJ  J || _         || _        | j                  |||       t        j	                  d       	 |j                         }t        j                  d|        |n$	 | j                  |       |j                  |       Ot        j	                  d       t        j                          y # t        $ r}|j                  |       Y d }~Jd }~ww xY w)Nr   zSetup completez	Got test zTerminating ...)r%  rq   r  r  r  r   r  r  rV  rS  r   r  )r  r%  rq   r  
task_queuecompletion_queuer  rZ  s           r5   _worker_loopz#MultiProcContinousTest._worker_loopQ  s     D%:%%%%%# 	T:y1 	$%  nn&GLL9WI./)&&w/ $$W-   	%&""$ ! ) $$R(()s   4"C 	C%
C  C%c                    g | _         g | _        g | _        t        j                  d      j
                  | _        	 t        j                  j                  d       t        t        |            D ]	  }t        j                  j                         }t        j                  j                         }t        j                  j                  | j                  dt!        |      z   d||| j                  ||f      }|j#                          | j                   j%                  |       | j                  j%                  |       | j                  j%                  |       t&        j)                  d||j*                          y # t        $ r Y .w xY w)NFr  r  r  T)r  rQ  r  ro   r  )r  task_queuescompletion_queuesrO  r  rQ  r  rm   r  r  r  r-  r0   r  r  r  r2   r  r0  r  r  r  )r  rq   r%  r  r  r  s         r5   r  z'MultiProcContinousTest._spawn_processess  sA    " 335AFF	!!227;
 #j/* 	D..446J$44::<++33''#d)+JzCST	 4 G MMOMM  )OO"":.!!(()9:KK0$		  		s   E= =	F
	F
c                    t         |           | j                         }| j                  dk(  rPt	        j
                  |      j                         | _        | j                  dk(  rt        j                  d| d      t        j                  d| j                   d| j                   d|        | j                  | j                         y)	z
        Class-scope test fixture. Run once for entire test class, before any test starts.
        Set up the process group.
        r  r   zNo z devices availablezTesting class z on  N)rz  r  r  rq   rm   r  rn   r  r  r  r  r-   r  )r  r  r  s     r5   r  z!MultiProcContinousTest.setUpClass  s     	 oo'>>R"44[ANNPCN~~"''#k]:L(MNNS\\N$s~~.>a}M	
 	S^^,r4   c                    t         j                  d| j                   d       | j                  D ]  }|j	                  d        | j
                  D ]  }|j                           	 t        j                  | j                         t         j                  d| j                   d       t        | 9          y# t        $ r Y =w xY w)z
        Class-scope test fixture. Run once for entire test class, after all tests finish.
        Tear down the process group.
        zJoining z workersNzClass z	 finished)r  r  rq   r  rV  r  rT  rk   r  r  r  r  r-   rz  r  )r  r  r  r  s      r5   r  z$MultiProcContinousTest.tearDownClass  s     	x/x89// 	!JNN4 	! }} 	GLLN		IIcmm$ 	fS\\N)45	  		s   )B: :	CCc                    t         |           | j                  | _        | j                  j
                  r&t        j                  d| j                                t        | j                        D ]M  \  }}t        j                  d| d| j                                 |j                  | j                                O y)z5
        Test fixture. Run before each test.
        zPrevious test failed, skipping zSending Rank rx  N)rz  r  rj  r%  r  r  r  r  r  r  r  r  r  rV  )r`  r>  r  r  s      r5   r  zMultiProcContinousTest.setUp  s     	 **	 >>%%##&Edggi[$QRR 't'7'78 	&MAzLL=2dggi[9:NN4779%	&r4   c                 V    t              fd       }t        j                  ||       S )Nc           	      0   | j                   | j                  k(  rt        j                  d| j	                                 t        | j                        D ]  \  }}|j                         }t        |t              rSt        j                  d| d| j	                          d| j                  j                          d| j                  _        ||| j	                         k(  sJ t        j                  d| d| j	                                  y          y )NzWaiting for workers to finish zDetected failure from Rank z in: z(, skipping rest of tests in Test class: TzMain proc detected rank z
 finished )r%  rj  r  r  r  r  r  r   r  rS  r  r  r-   r  )r`  r>  r  rvr;  s       r5   rs   z=MultiProcContinousTest._worker_run_main_wait.<locals>.wrapper  s    yyD222=dggi[IJ+4T5K5K+L 'A')--/B!"m49!E$'') MEEI^^E\E\D]_ 6:2  ?*?LL21#Z	{K( r4   rm  rp  s    ` r5   _worker_run_main_waitz,MultiProcContinousTest._worker_run_main_wait  s.    	r	 
	4 ..r4   rs  rt  c                     |dk7  r|}t         |   |       	 t        | |      }t        | || j	                  |             y # t
        $ r+}|dk7  rt        d| j                   d|       |Y d }~y d }~ww xY wrv  )rz  r{  r|  r}  r  r~  r   r  r  s        r5   r{  zMultiProcContinousTest.__init__  s    
 "$K%		{+BD+t'A'A"'EF 	Y& !-dnn-=R
|L '	r  )Fr  r  )r-   r.   r/   rj  rq   r0   r1   r%  r  r   r2   r   r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r{  r  r  s   @r5   r  r    sJ   JD#N#Ix}#"3/GY/KHSM    C       < < 	 	4 	 	 % %B  @ - -*    .&$/F ?H8;	 r4   r  r`   r   )r   r  )r$   TF)r  r)  r  r  r9  rk   r  r>  rh   rO  r  r  r  rn  r  
contextlibr   dataclassesr   datetimer   enumr   	functoolsr   r   r	   ior
   typingr   r   r   r   r   unittest.mockr   rm   torch._dynamo.test_casetorch.cuda.nccltorch.distributedr  r   torch.nnr  torch._C._autogradr   torch._C._distributed_c10dr   torch._logging._internalr   $torch.testing._internal.common_utilsr   r   r   r   r   r   r   r   r   r   r   r   5torch.testing._internal.distributed.multi_threaded_pgr    r!   r"   	getLoggerr-   r  setLevelINFOr   ra   HAS_ACCELERATORr*   rj   rR   rc   rv   r}   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rb   r0   r  r   r	  r  getenvr  r  r  r$  r?  r2   rF  rG  rP  r1   rV  rY  rf  r\  r  r+  rA  r7  r=  rB  rF  rh  rT  r  r  r  r  r.  	test_caser  r  r  r3   r4   r5   <module>r     s        	   
       % !   , ,  = =        ) 7 .     
		8	$  4 E? 3x38z 
8
C x$FG	
 Xb"BC x45 8B => 8B >? 8B >? 8B >? 8B >? 8B >? 8B >? 8B >? HR>?  (267!" hr#LM#$ x
V%* 8B DE+, hr#BC-
4 * * **&6 $R$+\4	
:
F Fc F# F$ F  	a 
 
: O)"))$GOPO,c2  +.'(
T 
IC I 2 2(S (c (s (XS 3 2 26(--	. 5
Xc] 
d 
"  y8 yB+N. +N\d3i( 
 F   ,0 
3E7tl&H l&^Tbii T #RYY #  <A) )> %--*A*A*J*J  @.)< .8zX zr4   