
    9j+H                        d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
mZ d dlmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dl m!Z!m"Z" d dl#m$Z$m%Z%m&Z& d dl'm(Z( d dl)m*Z*m+Z+ d dl,m-Z- ddl.m/Z/m0Z0m1Z1 erd dl2m3Z3 g dZ4 G d de      Z5 ede6      	 	 	 	 d2dede*dejn                  dz  de8d e9d!e"dz  d"efd#       Z: ed$%      e/dddddd$d&ded'e;ejx                  z  dz  de*dz  d!e"dz  dejn                  dz  d e9d(e9d"efd)              Z=e G d* d+             Z> ed$%      dddde5j~                  ddd$d,ded'e;ejx                  z  dz  de*dz  d!e"dz  dejn                  dz  d-e5d.e$dz  d e9d(e9d"ee>z  fd/       Z@ ed$%      ded"efd0       ZA	 	 	 	 	 d3dede*dejn                  dz  de8d e9d!e"dz  d(e9d"efd1ZBy)4    N)Future)	dataclass)Enum)castTYPE_CHECKING)
deprecated)STATE_DICT_TYPE)$_ProcessBasedAsyncCheckpointExecutor)#_ThreadBasedAsyncCheckpointExecutor)_storage_setup)DefaultSavePlanner)_dcp_method_logger)Metadata)SavePlanSavePlanner)AsyncStagerDefaultStagerStagingOptions)Stateful)StorageWriterWriteResult)_get_default_group   )_api_bc_check_DistWrapper_profile)_AsyncCheckpointExecutor)save_state_dictsave
async_saveAsyncCheckpointerTypeAsyncSaveResponsec                       e Zd ZdZdZdZy)r!   z!Enum for async checkpointer type.threadprocessN)__name__
__module____qualname____doc__THREADPROCESS     m/media/conek/DATA/Code/OCR/venv/lib/python3.12/site-packages/torch/distributed/checkpoint/state_dict_saver.pyr!   r!   3   s    +FGr-   r!   za`save_state_dict` is deprecated and will be removed in future versions.Please use `save` instead.)categoryF
state_dictstorage_writerprocess_groupcoordinator_rankno_distplannerreturnc           	          |j                          t               5  t        | |||||      cddd       S # 1 sw Y   yxY w)z3This method is deprecated. Please switch to 'save'.N)resetr   _save_state_dict)r0   r1   r2   r3   r4   r5   s         r.   r   r   :   sF      
 


 
 
s   5>Tlog_exceptionscheckpoint_idr1   r5   r2   r4   use_collectivesr=   r>   c          	      ~   t         j                  j                  d       |xs, t        j                          xs t        j
                          }|rt        j                  dd       t               5  t        t        t        ||d            }t        t        |       |||||      cddd       S # 1 sw Y   yxY w)	a;  
    Save a distributed model in SPMD style.

    This function is different from ``torch.save()`` as it handles
    ``ShardedTensor`` , and ``DTensor`` by having each rank only save their local shards.

    For each ``Stateful`` object (having both a ``state_dict`` and a ``load_state_dict``),
    save will call ``state_dict`` before serialization.

    .. warning::
        There is no guarantees of Backwards Compatibility across PyTorch versions
        for saved state_dicts.

    .. warning::
        If using the `process_group` argument, make sure that only its ranks
        call `save_state_dict` and that all data in state_dict belong to it.

    .. note::
        When saving checkpoint for FSDP's `ShardingStrategy.HYBRID_SHARD`, only one of
        the shard_group should be calling `save_state_dict` and the corresponding process
        group needs to be passed in.

    .. note::
        If no process group is available, this function assumes the intention is to save the
         state_dict in the local process.

    .. note:
        Rank 0 is assumed to be the coordinator rank.


    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform writes. If this is not
            specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specified, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)
        no_dist (bool):
            If ``True``, this function will assume the intent is to load
            a checkpoint on a single rank/process.
            (Default: ``False``)
        use_collectives (bool): If ``False``, this function will assume the intent is to save
            a checkpoint without using cross-rank synchronization.
            (Default: ``True``)
            This configuration is experimental and should be used with caution.
            It will change the format of the saved checkpoint and may not be backward compatible.

    Returns:
        Metadata: Metadata object for the saved checkpoint.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
        ...     "/checkpoint/1"
        ... )
        >>> torch.distributed.checkpoint.save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )

    .. note::
        save_state_dict uses collectives to coordinate writes across ranks.
        For NCCL-based process groups, internal tensor representations of
        objects must be moved to the GPU device before communication takes place.
        In this case, the device used is given by ``torch.cuda.current_device()``
        and it is the user's responsibility to ensure that this is set so that
        each rank has an individual GPU, via ``torch.cuda.set_device()``.
    z!torch.distributed.checkpoint.savezptorch.distributed is disabled, unavailable or uninitialized, assuming the intent is to save in a single process.   
stacklevelF)reader)r0   r1   r2   r4   r5   r>   N)torch_C_log_api_usage_oncedistis_availableis_initializedwarningswarnr   r   r   r   r9   _stateful_to_state_dict)r0   r=   r1   r5   r2   r4   r>   s          r.   r   r   V   s    ~ 
HH  !DEQd//11Q4;N;N;P7PG~	

 
 
>.-PUV
  .z:)'+

 
 
s   36B33B<c                   2    e Zd ZU dZed   ed<   ed   ed<   y)r"   a!  This class contains futures for staging and upload completion.
    It is returned by async_save().
    staging_completion is a future that indicates when local copy
    of state_dict is complete.
    upload_completion is a future that indicates when a checkpoint
    completed saving.
    Nstaging_completionupload_completion)r&   r'   r(   r)   r   __annotations__r,   r-   r.   r"   r"      s     t$d|#r-   r"   )r=   r1   r5   r2   async_checkpointer_typeasync_stagerr4   r>   rQ   rR   c          	          t         j                  j                  d       t        j                         rNt        j
                         r:|xs
 t               }	t        j                  d      |	j                  vrt        d      ,|t        |t              r|nt        t        dddd            t                t        d      dt         t"           t"        z  f fd	       }
 |
       }|t$        j&                  k(  r
t)               n	t+               }|j-                  |||||||
      }t        |t               rh|}t!               }|fdt         t"           dt         d   fd}|j/                         s|j1                  |       n|j3                  d       t5        ||      S t        d      fd       } |        |S )a   Asynchronous version of ``save``. This code first de-stages the state_dict on to the
    staging storage (defaults to CPU memory), and then calls the `save` in a separate thread.

    .. warning::
        This feature is experimental and subject to change.
        MUST CALL CLOSE AFTER LAST CHECKPOINT IS SAVED

    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform 'stage' and  'save'. If
            this is not specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specified, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)
        async_checkpointer_type (AsyncCheckpointerType):
            whether to do checkpoint in separate thread or process
            (Default: ``AsyncCheckpointerType.THREAD``)
        async_stager (AsyncStager):
            provides staging implementation. If storage_writer implements AsyncStager
            and async_stager is provided, async_stager will be used for staging
        no_dist (bool):
            If ``True``, this function will assume the intent is to save
            a checkpoint on a single rank/process.
            (Default: ``False``)
        use_collectives: If False, Save the checkpoint without rank coordination. (Default: ``True``)
            This configuration is experimental and should be used with caution.
            It will change the format of the saved checkpoint and may not be backward compatible.

    Returns:
        Future: A future holding the resultant Metadata object from `save`.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
        ...     "/checkpoint/1"
        ... )
        >>> checkpoint_future = torch.distributed.checkpoint.async_save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )
        >>>
        >>> # ... do some work ...
        >>>
        >>> checkpoint_future.result()

    z'torch.distributed.checkpoint.async_savecpuzfA CPU backend must be enabled for async save; try initializing process group with 'cpu:gloo,cuda:nccl'NFTr:   r6   c                  &     j                        S N)stage)rR   r0   s   r.   stage_state_dictz$async_save.<locals>.stage_state_dict>  s    !!*--r-   r<   original_staging_futurereturn_staging_futurec                     	 | j                          |j                  d        y # t        $ r}|j                  |       Y d }~y d }~ww xY wrV   )result
set_result	Exceptionset_exception)rY   rZ   es      r.   callbackzasync_save.<locals>.callbackX  sB    7'..0%006 7%33A667s   !$ 	AAA)rN   rO   c                  @     j                   r j                          y y rV   ) should_synchronize_after_executesynchronize_staging)rR   s   r.   maybe_synchronize_stagingz-async_save.<locals>.maybe_synchronize_stagingm  s    <<002 =r-   )rD   rE   rF   rG   rH   rI   r   device_device_typesAssertionError
isinstancer   r   r   rL   r   r   r	   r!   r+   r
   r   execute_savedoneadd_done_callbackr]   r"   )r0   r=   r1   r5   r2   rQ   rR   r4   r>   pgrX   staging_future_or_state_dictupload_executorupload_futurestaging_futurerZ   ra   re   s   `     `           r.   r    r       s   T 
HH  !JKt224202<<b&6&66 x  %*^[*Q)L(	L )4Jt,.f_5G . -. $4#5  #&;&C&CC 	-.02  ,88$#%#' 9 M .75.4h 3H	7%+O%<	7#)$<	7 ""$,,X6!,,T2 !4
 	

 
4	0	3 
1	3 	"#r-   c                     i }| j                         D ]-  \  }}d }d| |_          t        d      |      |      ||<   / |S )z]Creates a shallow copy of `state_dict` where `state_dict` is called for each Stateful object.c                 F    t        | t              r| j                         S | S rV   )ri   r   r0   )elems    r.   _elem_to_state_dictz4_stateful_to_state_dict.<locals>._elem_to_state_dict|  s    (24(B4??$LLr-   z_stateful_to_state_dict.Tr:   )itemsr&   r   )r0   stateful_state_dictkeyrt   ru   s        r.   rL   rL   v  so     %%' 		T	M *B#'G$$
#J#5T#J$

$C 	 r-   c                 `    t         j                  j                  d       t        || |      
t	               t        d      d i }t        dd       x}||d<   j                  |d<   t        di | fd       }	t        di |fd       }
d rj                  d|	|
      n |	       } |
|g      }|d   t        di |fd	       }t        di |fd
       }rj                  d||      }|S  |       } ||g      }j                          |S )Nz,torch.distributed.checkpoint.save_state_dictplanner is Noner=   r2   c                  \   t        d      j                         } dt        j                  j                        j
                  vr4t        j                  dd       j	                  j                         nj	                  | j                         dt        j                  j                        j
                  v r)j                  j                  j                         nj                  j                         j                         }j                  |      }|S )	Nrz   storage_metazThe function definition for SavePlanner.set_up_planner has been updated to include the storage_meta argument. Please update your implementation to include this parameter.r@   rA   )r0   r|   is_coordinatorkwargs)rankr>   )rh   r|   inspect	signatureset_up_planner
parametersrJ   rK   r}   set_up_storage_writerr   create_local_planprepare_local_plan)r|   
local_plandistWr5   r0   r1   r>   s     r.   
local_stepz$_save_state_dict.<locals>.local_step  s   ? !233%224!2!273I3I!J!U!UUMM. 	 "":u/C/CD""%)$33 #    !E!EFQQR 00$$ZZ / 1  001E1EF..0
#66zB
r-   c                 l    t        d      j                  |       \  } j                  |       } | S )Nrz   )rh   create_global_planprepare_global_plan)all_local_plansglobal_metadatar5   r1   s    r.   global_stepz%_save_state_dict.<locals>.global_step  sA     ? !233+2+E+Eo+V((<<_Mr-   planr   c                      t        d      t        d      j                        } j                  |       }|j                          |j	                         S )Nrz   zcentral_plan is None)rh   finish_plan
write_datawaitvalue)final_local_plan
all_writescentral_planr5   r1   s     r.   r   z$_save_state_dict.<locals>.write_data  sc    ? !233 !788"..|<#../?I
!!r-   c                 H    t        d      j                  |        S )Nzglobal_metadata is None)metadataresults)rh   finish)all_resultsr   r1   s    r.   finish_checkpointz+_save_state_dict.<locals>.finish_checkpoint  s.    " !:;;Lr-   writer,   )rD   rE   rF   r   r   rh   getattrgroupr   reduce_scatter
all_reducebarrier)r0   r1   r2   r3   r4   r5   r>   ckpt_kwargsckpt_idr   r   r   global_planr   r   r   write_resultsr   r   r   s   ``   ``          @@@r.   r9   r9     sh    
HH  !OPG5EFE$&.//OK>?DAAN'.O$',{{O$&+&! '!F &+& ' %)L++FJL)|
&1:,&?"1~&+&	" '	" &+& ' ##GZ9JK O	 ,6<$m_5Or-   )Nr   FN)Nr   FNT)Cr   osrJ   concurrent.futuresr   dataclassesr   enumr   typingr   r   typing_extensionsr   rD   torch.distributeddistributedrG   #torch.distributed._state_dict_utilsr	   4torch.distributed.checkpoint._async_process_executorr
   3torch.distributed.checkpoint._async_thread_executorr   +torch.distributed.checkpoint._storage_utilsr   ,torch.distributed.checkpoint.default_plannerr   #torch.distributed.checkpoint.loggerr   %torch.distributed.checkpoint.metadatar   $torch.distributed.checkpoint.plannerr   r   $torch.distributed.checkpoint.stagingr   r   r   %torch.distributed.checkpoint.statefulr   $torch.distributed.checkpoint.storager   r   "torch.distributed.distributed_c10dr   utilsr   r   r   ,torch.distributed.checkpoint._async_executorr   __all__r!   FutureWarningProcessGroupintboolr   strPathLiker   r"   r*   r    rL   r9   r,   r-   r.   <module>r      s=    	  % !  & (    ? G K B : F 
 ; K A 8 8 UD  ! /3"&

!
 $$t+
 	

 
 4
 


. 4( /3+/"&.2 r
r
 $t+r
 "D(	r

 4r
 $$t+r
 r
 r
 r
  )r
j 
$ 
$ 
$ 4( /3+/"&.25J5Q5Q'+ WW $t+W "D(	W
 4W $$t+W 3W $W W W W )Wt 4( O  )& /3"& hh!h $$t+h 	h
 h 4h h hr-   